locking: Change locking order, move NAL after instances
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31 import gc
32 import itertools
33
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
41
42 import testutils
43
44
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
48
49 #: List for looping tests
50 ITERATIONS = range(8)
51
52
53 def _Repeat(fn):
54   """Decorator for executing a function many times"""
55   def wrapper(*args, **kwargs):
56     for i in ITERATIONS:
57       fn(*args, **kwargs)
58   return wrapper
59
60
61 def SafeSleep(duration):
62   start = time.time()
63   while True:
64     delay = start + duration - time.time()
65     if delay <= 0.0:
66       break
67     time.sleep(delay)
68
69
70 class _ThreadedTestCase(unittest.TestCase):
71   """Test class that supports adding/waiting on threads"""
72   def setUp(self):
73     unittest.TestCase.setUp(self)
74     self.done = Queue.Queue(0)
75     self.threads = []
76
77   def _addThread(self, *args, **kwargs):
78     """Create and remember a new thread"""
79     t = threading.Thread(*args, **kwargs)
80     self.threads.append(t)
81     t.start()
82     return t
83
84   def _waitThreads(self):
85     """Wait for all our threads to finish"""
86     for t in self.threads:
87       t.join(60)
88       self.failIf(t.isAlive())
89     self.threads = []
90
91
92 class _ConditionTestCase(_ThreadedTestCase):
93   """Common test case for conditions"""
94
95   def setUp(self, cls):
96     _ThreadedTestCase.setUp(self)
97     self.lock = threading.Lock()
98     self.cond = cls(self.lock)
99
100   def _testAcquireRelease(self):
101     self.assertFalse(self.cond._is_owned())
102     self.assertRaises(RuntimeError, self.cond.wait, None)
103     self.assertRaises(RuntimeError, self.cond.notifyAll)
104
105     self.cond.acquire()
106     self.assert_(self.cond._is_owned())
107     self.cond.notifyAll()
108     self.assert_(self.cond._is_owned())
109     self.cond.release()
110
111     self.assertFalse(self.cond._is_owned())
112     self.assertRaises(RuntimeError, self.cond.wait, None)
113     self.assertRaises(RuntimeError, self.cond.notifyAll)
114
115   def _testNotification(self):
116     def _NotifyAll():
117       self.done.put("NE")
118       self.cond.acquire()
119       self.done.put("NA")
120       self.cond.notifyAll()
121       self.done.put("NN")
122       self.cond.release()
123
124     self.cond.acquire()
125     self._addThread(target=_NotifyAll)
126     self.assertEqual(self.done.get(True, 1), "NE")
127     self.assertRaises(Queue.Empty, self.done.get_nowait)
128     self.cond.wait(None)
129     self.assertEqual(self.done.get(True, 1), "NA")
130     self.assertEqual(self.done.get(True, 1), "NN")
131     self.assert_(self.cond._is_owned())
132     self.cond.release()
133     self.assertFalse(self.cond._is_owned())
134
135
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137   """SingleNotifyPipeCondition tests"""
138
139   def setUp(self):
140     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141
142   def testAcquireRelease(self):
143     self._testAcquireRelease()
144
145   def testNotification(self):
146     self._testNotification()
147
148   def testWaitReuse(self):
149     self.cond.acquire()
150     self.cond.wait(0)
151     self.cond.wait(0.1)
152     self.cond.release()
153
154   def testNoNotifyReuse(self):
155     self.cond.acquire()
156     self.cond.notifyAll()
157     self.assertRaises(RuntimeError, self.cond.wait, None)
158     self.assertRaises(RuntimeError, self.cond.notifyAll)
159     self.cond.release()
160
161
162 class TestPipeCondition(_ConditionTestCase):
163   """PipeCondition tests"""
164
165   def setUp(self):
166     _ConditionTestCase.setUp(self, locking.PipeCondition)
167
168   def testAcquireRelease(self):
169     self._testAcquireRelease()
170
171   def testNotification(self):
172     self._testNotification()
173
174   def _TestWait(self, fn):
175     threads = [
176       self._addThread(target=fn),
177       self._addThread(target=fn),
178       self._addThread(target=fn),
179       ]
180
181     # Wait for threads to be waiting
182     for _ in threads:
183       self.assertEqual(self.done.get(True, 1), "A")
184
185     self.assertRaises(Queue.Empty, self.done.get_nowait)
186
187     self.cond.acquire()
188     self.assertEqual(len(self.cond._waiters), 3)
189     self.assertEqual(self.cond._waiters, set(threads))
190
191     self.assertTrue(repr(self.cond).startswith("<"))
192     self.assertTrue("waiters=" in repr(self.cond))
193
194     # This new thread can't acquire the lock, and thus call wait, before we
195     # release it
196     self._addThread(target=fn)
197     self.cond.notifyAll()
198     self.assertRaises(Queue.Empty, self.done.get_nowait)
199     self.cond.release()
200
201     # We should now get 3 W and 1 A (for the new thread) in whatever order
202     w = 0
203     a = 0
204     for i in range(4):
205       got = self.done.get(True, 1)
206       if got == "W":
207         w += 1
208       elif got == "A":
209         a += 1
210       else:
211         self.fail("Got %s on the done queue" % got)
212
213     self.assertEqual(w, 3)
214     self.assertEqual(a, 1)
215
216     self.cond.acquire()
217     self.cond.notifyAll()
218     self.cond.release()
219     self._waitThreads()
220     self.assertEqual(self.done.get_nowait(), "W")
221     self.assertRaises(Queue.Empty, self.done.get_nowait)
222
223   def testBlockingWait(self):
224     def _BlockingWait():
225       self.cond.acquire()
226       self.done.put("A")
227       self.cond.wait(None)
228       self.cond.release()
229       self.done.put("W")
230
231     self._TestWait(_BlockingWait)
232
233   def testLongTimeoutWait(self):
234     def _Helper():
235       self.cond.acquire()
236       self.done.put("A")
237       self.cond.wait(15.0)
238       self.cond.release()
239       self.done.put("W")
240
241     self._TestWait(_Helper)
242
243   def _TimeoutWait(self, timeout, check):
244     self.cond.acquire()
245     self.cond.wait(timeout)
246     self.cond.release()
247     self.done.put(check)
248
249   def testShortTimeoutWait(self):
250     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
251     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
252     self._waitThreads()
253     self.assertEqual(self.done.get_nowait(), "T1")
254     self.assertEqual(self.done.get_nowait(), "T1")
255     self.assertRaises(Queue.Empty, self.done.get_nowait)
256
257   def testZeroTimeoutWait(self):
258     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
259     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
260     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
261     self._waitThreads()
262     self.assertEqual(self.done.get_nowait(), "T0")
263     self.assertEqual(self.done.get_nowait(), "T0")
264     self.assertEqual(self.done.get_nowait(), "T0")
265     self.assertRaises(Queue.Empty, self.done.get_nowait)
266
267
268 class TestSharedLock(_ThreadedTestCase):
269   """SharedLock tests"""
270
271   def setUp(self):
272     _ThreadedTestCase.setUp(self)
273     self.sl = locking.SharedLock("TestSharedLock")
274
275     self.assertTrue(repr(self.sl).startswith("<"))
276     self.assertTrue("name=TestSharedLock" in repr(self.sl))
277
278   def testSequenceAndOwnership(self):
279     self.assertFalse(self.sl.is_owned())
280     self.sl.acquire(shared=1)
281     self.assert_(self.sl.is_owned())
282     self.assert_(self.sl.is_owned(shared=1))
283     self.assertFalse(self.sl.is_owned(shared=0))
284     self.sl.release()
285     self.assertFalse(self.sl.is_owned())
286     self.sl.acquire()
287     self.assert_(self.sl.is_owned())
288     self.assertFalse(self.sl.is_owned(shared=1))
289     self.assert_(self.sl.is_owned(shared=0))
290     self.sl.release()
291     self.assertFalse(self.sl.is_owned())
292     self.sl.acquire(shared=1)
293     self.assert_(self.sl.is_owned())
294     self.assert_(self.sl.is_owned(shared=1))
295     self.assertFalse(self.sl.is_owned(shared=0))
296     self.sl.release()
297     self.assertFalse(self.sl.is_owned())
298
299   def testBooleanValue(self):
300     # semaphores are supposed to return a true value on a successful acquire
301     self.assert_(self.sl.acquire(shared=1))
302     self.sl.release()
303     self.assert_(self.sl.acquire())
304     self.sl.release()
305
306   def testDoubleLockingStoE(self):
307     self.sl.acquire(shared=1)
308     self.assertRaises(AssertionError, self.sl.acquire)
309
310   def testDoubleLockingEtoS(self):
311     self.sl.acquire()
312     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
313
314   def testDoubleLockingStoS(self):
315     self.sl.acquire(shared=1)
316     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
317
318   def testDoubleLockingEtoE(self):
319     self.sl.acquire()
320     self.assertRaises(AssertionError, self.sl.acquire)
321
322   # helper functions: called in a separate thread they acquire the lock, send
323   # their identifier on the done queue, then release it.
324   def _doItSharer(self):
325     try:
326       self.sl.acquire(shared=1)
327       self.done.put("SHR")
328       self.sl.release()
329     except errors.LockError:
330       self.done.put("ERR")
331
332   def _doItExclusive(self):
333     try:
334       self.sl.acquire()
335       self.done.put("EXC")
336       self.sl.release()
337     except errors.LockError:
338       self.done.put("ERR")
339
340   def _doItDelete(self):
341     try:
342       self.sl.delete()
343       self.done.put("DEL")
344     except errors.LockError:
345       self.done.put("ERR")
346
347   def testSharersCanCoexist(self):
348     self.sl.acquire(shared=1)
349     threading.Thread(target=self._doItSharer).start()
350     self.assert_(self.done.get(True, 1))
351     self.sl.release()
352
353   @_Repeat
354   def testExclusiveBlocksExclusive(self):
355     self.sl.acquire()
356     self._addThread(target=self._doItExclusive)
357     self.assertRaises(Queue.Empty, self.done.get_nowait)
358     self.sl.release()
359     self._waitThreads()
360     self.failUnlessEqual(self.done.get_nowait(), "EXC")
361
362   @_Repeat
363   def testExclusiveBlocksDelete(self):
364     self.sl.acquire()
365     self._addThread(target=self._doItDelete)
366     self.assertRaises(Queue.Empty, self.done.get_nowait)
367     self.sl.release()
368     self._waitThreads()
369     self.failUnlessEqual(self.done.get_nowait(), "DEL")
370     self.sl = locking.SharedLock(self.sl.name)
371
372   @_Repeat
373   def testExclusiveBlocksSharer(self):
374     self.sl.acquire()
375     self._addThread(target=self._doItSharer)
376     self.assertRaises(Queue.Empty, self.done.get_nowait)
377     self.sl.release()
378     self._waitThreads()
379     self.failUnlessEqual(self.done.get_nowait(), "SHR")
380
381   @_Repeat
382   def testSharerBlocksExclusive(self):
383     self.sl.acquire(shared=1)
384     self._addThread(target=self._doItExclusive)
385     self.assertRaises(Queue.Empty, self.done.get_nowait)
386     self.sl.release()
387     self._waitThreads()
388     self.failUnlessEqual(self.done.get_nowait(), "EXC")
389
390   @_Repeat
391   def testSharerBlocksDelete(self):
392     self.sl.acquire(shared=1)
393     self._addThread(target=self._doItDelete)
394     self.assertRaises(Queue.Empty, self.done.get_nowait)
395     self.sl.release()
396     self._waitThreads()
397     self.failUnlessEqual(self.done.get_nowait(), "DEL")
398     self.sl = locking.SharedLock(self.sl.name)
399
400   @_Repeat
401   def testWaitingExclusiveBlocksSharer(self):
402     """SKIPPED testWaitingExclusiveBlockSharer"""
403     return
404
405     self.sl.acquire(shared=1)
406     # the lock is acquired in shared mode...
407     self._addThread(target=self._doItExclusive)
408     # ...but now an exclusive is waiting...
409     self._addThread(target=self._doItSharer)
410     # ...so the sharer should be blocked as well
411     self.assertRaises(Queue.Empty, self.done.get_nowait)
412     self.sl.release()
413     self._waitThreads()
414     # The exclusive passed before
415     self.failUnlessEqual(self.done.get_nowait(), "EXC")
416     self.failUnlessEqual(self.done.get_nowait(), "SHR")
417
418   @_Repeat
419   def testWaitingSharerBlocksExclusive(self):
420     """SKIPPED testWaitingSharerBlocksExclusive"""
421     return
422
423     self.sl.acquire()
424     # the lock is acquired in exclusive mode...
425     self._addThread(target=self._doItSharer)
426     # ...but now a sharer is waiting...
427     self._addThread(target=self._doItExclusive)
428     # ...the exclusive is waiting too...
429     self.assertRaises(Queue.Empty, self.done.get_nowait)
430     self.sl.release()
431     self._waitThreads()
432     # The sharer passed before
433     self.assertEqual(self.done.get_nowait(), "SHR")
434     self.assertEqual(self.done.get_nowait(), "EXC")
435
436   def testDelete(self):
437     self.sl.delete()
438     self.assertRaises(errors.LockError, self.sl.acquire)
439     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
440     self.assertRaises(errors.LockError, self.sl.delete)
441
442   def testDeleteTimeout(self):
443     self.assertTrue(self.sl.delete(timeout=60))
444
445   def testDeleteTimeoutFail(self):
446     ready = threading.Event()
447     finish = threading.Event()
448
449     def fn():
450       self.sl.acquire(shared=0)
451       ready.set()
452
453       finish.wait()
454       self.sl.release()
455
456     self._addThread(target=fn)
457     ready.wait()
458
459     # Test if deleting a lock owned in exclusive mode by another thread fails
460     # to delete when a timeout is used
461     self.assertFalse(self.sl.delete(timeout=0.02))
462
463     finish.set()
464     self._waitThreads()
465
466     self.assertTrue(self.sl.delete())
467     self.assertRaises(errors.LockError, self.sl.acquire)
468
469   def testNoDeleteIfSharer(self):
470     self.sl.acquire(shared=1)
471     self.assertRaises(AssertionError, self.sl.delete)
472
473   @_Repeat
474   def testDeletePendingSharersExclusiveDelete(self):
475     self.sl.acquire()
476     self._addThread(target=self._doItSharer)
477     self._addThread(target=self._doItSharer)
478     self._addThread(target=self._doItExclusive)
479     self._addThread(target=self._doItDelete)
480     self.sl.delete()
481     self._waitThreads()
482     # The threads who were pending return ERR
483     for _ in range(4):
484       self.assertEqual(self.done.get_nowait(), "ERR")
485     self.sl = locking.SharedLock(self.sl.name)
486
487   @_Repeat
488   def testDeletePendingDeleteExclusiveSharers(self):
489     self.sl.acquire()
490     self._addThread(target=self._doItDelete)
491     self._addThread(target=self._doItExclusive)
492     self._addThread(target=self._doItSharer)
493     self._addThread(target=self._doItSharer)
494     self.sl.delete()
495     self._waitThreads()
496     # The two threads who were pending return both ERR
497     self.assertEqual(self.done.get_nowait(), "ERR")
498     self.assertEqual(self.done.get_nowait(), "ERR")
499     self.assertEqual(self.done.get_nowait(), "ERR")
500     self.assertEqual(self.done.get_nowait(), "ERR")
501     self.sl = locking.SharedLock(self.sl.name)
502
503   @_Repeat
504   def testExclusiveAcquireTimeout(self):
505     for shared in [0, 1]:
506       on_queue = threading.Event()
507       release_exclusive = threading.Event()
508
509       def _LockExclusive():
510         self.sl.acquire(shared=0, test_notify=on_queue.set)
511         self.done.put("A: start wait")
512         release_exclusive.wait()
513         self.done.put("A: end wait")
514         self.sl.release()
515
516       # Start thread to hold lock in exclusive mode
517       self._addThread(target=_LockExclusive)
518
519       # Wait for wait to begin
520       self.assertEqual(self.done.get(timeout=60), "A: start wait")
521
522       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
523       # on the queue
524       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
525                                       test_notify=release_exclusive.set))
526
527       self.done.put("got 2nd")
528       self.sl.release()
529
530       self._waitThreads()
531
532       self.assertEqual(self.done.get_nowait(), "A: end wait")
533       self.assertEqual(self.done.get_nowait(), "got 2nd")
534       self.assertRaises(Queue.Empty, self.done.get_nowait)
535
536   @_Repeat
537   def testAcquireExpiringTimeout(self):
538     def _AcquireWithTimeout(shared, timeout):
539       if not self.sl.acquire(shared=shared, timeout=timeout):
540         self.done.put("timeout")
541
542     for shared in [0, 1]:
543       # Lock exclusively
544       self.sl.acquire()
545
546       # Start shared acquires with timeout between 0 and 20 ms
547       for i in range(11):
548         self._addThread(target=_AcquireWithTimeout,
549                         args=(shared, i * 2.0 / 1000.0))
550
551       # Wait for threads to finish (makes sure the acquire timeout expires
552       # before releasing the lock)
553       self._waitThreads()
554
555       # Release lock
556       self.sl.release()
557
558       for _ in range(11):
559         self.assertEqual(self.done.get_nowait(), "timeout")
560
561       self.assertRaises(Queue.Empty, self.done.get_nowait)
562
563   @_Repeat
564   def testSharedSkipExclusiveAcquires(self):
565     # Tests whether shared acquires jump in front of exclusive acquires in the
566     # queue.
567
568     def _Acquire(shared, name, notify_ev, wait_ev):
569       if notify_ev:
570         notify_fn = notify_ev.set
571       else:
572         notify_fn = None
573
574       if wait_ev:
575         wait_ev.wait()
576
577       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
578         return
579
580       self.done.put(name)
581       self.sl.release()
582
583     # Get exclusive lock while we fill the queue
584     self.sl.acquire()
585
586     shrcnt1 = 5
587     shrcnt2 = 7
588     shrcnt3 = 9
589     shrcnt4 = 2
590
591     # Add acquires using threading.Event for synchronization. They'll be
592     # acquired exactly in the order defined in this list.
593     acquires = (shrcnt1 * [(1, "shared 1")] +
594                 3 * [(0, "exclusive 1")] +
595                 shrcnt2 * [(1, "shared 2")] +
596                 shrcnt3 * [(1, "shared 3")] +
597                 shrcnt4 * [(1, "shared 4")] +
598                 3 * [(0, "exclusive 2")])
599
600     ev_cur = None
601     ev_prev = None
602
603     for args in acquires:
604       ev_cur = threading.Event()
605       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
606       ev_prev = ev_cur
607
608     # Wait for last acquire to start
609     ev_prev.wait()
610
611     # Expect 6 pending exclusive acquires and 1 for all shared acquires
612     # together
613     self.assertEqual(self.sl._count_pending(), 7)
614
615     # Release exclusive lock and wait
616     self.sl.release()
617
618     self._waitThreads()
619
620     # Check sequence
621     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
622       # Shared locks aren't guaranteed to be notified in order, but they'll be
623       # first
624       tmp = self.done.get_nowait()
625       if tmp == "shared 1":
626         shrcnt1 -= 1
627       elif tmp == "shared 2":
628         shrcnt2 -= 1
629       elif tmp == "shared 3":
630         shrcnt3 -= 1
631       elif tmp == "shared 4":
632         shrcnt4 -= 1
633     self.assertEqual(shrcnt1, 0)
634     self.assertEqual(shrcnt2, 0)
635     self.assertEqual(shrcnt3, 0)
636     self.assertEqual(shrcnt3, 0)
637
638     for _ in range(3):
639       self.assertEqual(self.done.get_nowait(), "exclusive 1")
640
641     for _ in range(3):
642       self.assertEqual(self.done.get_nowait(), "exclusive 2")
643
644     self.assertRaises(Queue.Empty, self.done.get_nowait)
645
646   def testIllegalDowngrade(self):
647     # Not yet acquired
648     self.assertRaises(AssertionError, self.sl.downgrade)
649
650     # Acquire in shared mode, downgrade should be no-op
651     self.assertTrue(self.sl.acquire(shared=1))
652     self.assertTrue(self.sl.is_owned(shared=1))
653     self.assertTrue(self.sl.downgrade())
654     self.assertTrue(self.sl.is_owned(shared=1))
655     self.sl.release()
656
657   def testDowngrade(self):
658     self.assertTrue(self.sl.acquire())
659     self.assertTrue(self.sl.is_owned(shared=0))
660     self.assertTrue(self.sl.downgrade())
661     self.assertTrue(self.sl.is_owned(shared=1))
662     self.sl.release()
663
664   @_Repeat
665   def testDowngradeJumpsAheadOfExclusive(self):
666     def _KeepExclusive(ev_got, ev_downgrade, ev_release):
667       self.assertTrue(self.sl.acquire())
668       self.assertTrue(self.sl.is_owned(shared=0))
669       ev_got.set()
670       ev_downgrade.wait()
671       self.assertTrue(self.sl.is_owned(shared=0))
672       self.assertTrue(self.sl.downgrade())
673       self.assertTrue(self.sl.is_owned(shared=1))
674       ev_release.wait()
675       self.assertTrue(self.sl.is_owned(shared=1))
676       self.sl.release()
677
678     def _KeepExclusive2(ev_started, ev_release):
679       self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
680       self.assertTrue(self.sl.is_owned(shared=0))
681       ev_release.wait()
682       self.assertTrue(self.sl.is_owned(shared=0))
683       self.sl.release()
684
685     def _KeepShared(ev_started, ev_got, ev_release):
686       self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
687       self.assertTrue(self.sl.is_owned(shared=1))
688       ev_got.set()
689       ev_release.wait()
690       self.assertTrue(self.sl.is_owned(shared=1))
691       self.sl.release()
692
693     # Acquire lock in exclusive mode
694     ev_got_excl1 = threading.Event()
695     ev_downgrade_excl1 = threading.Event()
696     ev_release_excl1 = threading.Event()
697     th_excl1 = self._addThread(target=_KeepExclusive,
698                                args=(ev_got_excl1, ev_downgrade_excl1,
699                                      ev_release_excl1))
700     ev_got_excl1.wait()
701
702     # Start a second exclusive acquire
703     ev_started_excl2 = threading.Event()
704     ev_release_excl2 = threading.Event()
705     th_excl2 = self._addThread(target=_KeepExclusive2,
706                                args=(ev_started_excl2, ev_release_excl2))
707     ev_started_excl2.wait()
708
709     # Start shared acquires, will jump ahead of second exclusive acquire when
710     # first exclusive acquire downgrades
711     ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
712     ev_release_shared = threading.Event()
713
714     th_shared = [self._addThread(target=_KeepShared,
715                                  args=(ev_started, ev_got, ev_release_shared))
716                  for (ev_started, ev_got) in ev_shared]
717
718     # Wait for all shared acquires to start
719     for (ev, _) in ev_shared:
720       ev.wait()
721
722     # Check lock information
723     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
724                      [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
725     [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
726     self.assertEqual([(pendmode, sorted(waiting))
727                       for (pendmode, waiting) in pending],
728                      [("exclusive", [th_excl2.getName()]),
729                       ("shared", sorted(th.getName() for th in th_shared))])
730
731     # Shared acquires won't start until the exclusive lock is downgraded
732     ev_downgrade_excl1.set()
733
734     # Wait for all shared acquires to be successful
735     for (_, ev) in ev_shared:
736       ev.wait()
737
738     # Check lock information again
739     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
740                                               query.LQ_PENDING])),
741                      [(self.sl.name, "shared", None,
742                        [("exclusive", [th_excl2.getName()])])])
743     [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
744     self.assertEqual(set(owner), set([th_excl1.getName()] +
745                                      [th.getName() for th in th_shared]))
746
747     ev_release_excl1.set()
748     ev_release_excl2.set()
749     ev_release_shared.set()
750
751     self._waitThreads()
752
753     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
754                                               query.LQ_PENDING])),
755                      [(self.sl.name, None, None, [])])
756
757   @_Repeat
758   def testMixedAcquireTimeout(self):
759     sync = threading.Event()
760
761     def _AcquireShared(ev):
762       if not self.sl.acquire(shared=1, timeout=None):
763         return
764
765       self.done.put("shared")
766
767       # Notify main thread
768       ev.set()
769
770       # Wait for notification from main thread
771       sync.wait()
772
773       # Release lock
774       self.sl.release()
775
776     acquires = []
777     for _ in range(3):
778       ev = threading.Event()
779       self._addThread(target=_AcquireShared, args=(ev, ))
780       acquires.append(ev)
781
782     # Wait for all acquires to finish
783     for i in acquires:
784       i.wait()
785
786     self.assertEqual(self.sl._count_pending(), 0)
787
788     # Try to get exclusive lock
789     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
790
791     # Acquire exclusive without timeout
792     exclsync = threading.Event()
793     exclev = threading.Event()
794
795     def _AcquireExclusive():
796       if not self.sl.acquire(shared=0):
797         return
798
799       self.done.put("exclusive")
800
801       # Notify main thread
802       exclev.set()
803
804       # Wait for notification from main thread
805       exclsync.wait()
806
807       self.sl.release()
808
809     self._addThread(target=_AcquireExclusive)
810
811     # Try to get exclusive lock
812     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
813
814     # Make all shared holders release their locks
815     sync.set()
816
817     # Wait for exclusive acquire to succeed
818     exclev.wait()
819
820     self.assertEqual(self.sl._count_pending(), 0)
821
822     # Try to get exclusive lock
823     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
824
825     def _AcquireSharedSimple():
826       if self.sl.acquire(shared=1, timeout=None):
827         self.done.put("shared2")
828         self.sl.release()
829
830     for _ in range(10):
831       self._addThread(target=_AcquireSharedSimple)
832
833     # Tell exclusive lock to release
834     exclsync.set()
835
836     # Wait for everything to finish
837     self._waitThreads()
838
839     self.assertEqual(self.sl._count_pending(), 0)
840
841     # Check sequence
842     for _ in range(3):
843       self.assertEqual(self.done.get_nowait(), "shared")
844
845     self.assertEqual(self.done.get_nowait(), "exclusive")
846
847     for _ in range(10):
848       self.assertEqual(self.done.get_nowait(), "shared2")
849
850     self.assertRaises(Queue.Empty, self.done.get_nowait)
851
852   def testPriority(self):
853     # Acquire in exclusive mode
854     self.assert_(self.sl.acquire(shared=0))
855
856     # Queue acquires
857     def _Acquire(prev, next, shared, priority, result):
858       prev.wait()
859       self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
860       try:
861         self.done.put(result)
862       finally:
863         self.sl.release()
864
865     counter = itertools.count(0)
866     priorities = range(-20, 30)
867     first = threading.Event()
868     prev = first
869
870     # Data structure:
871     # {
872     #   priority:
873     #     [(shared/exclusive, set(acquire names), set(pending threads)),
874     #      (shared/exclusive, ...),
875     #      ...,
876     #     ],
877     # }
878     perprio = {}
879
880     # References shared acquire per priority in L{perprio}. Data structure:
881     # {
882     #   priority: (shared=1, set(acquire names), set(pending threads)),
883     # }
884     prioshared = {}
885
886     for seed in [4979, 9523, 14902, 32440]:
887       # Use a deterministic random generator
888       rnd = random.Random(seed)
889       for priority in [rnd.choice(priorities) for _ in range(30)]:
890         modes = [0, 1]
891         rnd.shuffle(modes)
892         for shared in modes:
893           # Unique name
894           acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
895
896           ev = threading.Event()
897           thread = self._addThread(target=_Acquire,
898                                    args=(prev, ev, shared, priority, acqname))
899           prev = ev
900
901           # Record expected aqcuire, see above for structure
902           data = (shared, set([acqname]), set([thread]))
903           priolist = perprio.setdefault(priority, [])
904           if shared:
905             priosh = prioshared.get(priority, None)
906             if priosh:
907               # Shared acquires are merged
908               for i, j in zip(priosh[1:], data[1:]):
909                 i.update(j)
910               assert data[0] == priosh[0]
911             else:
912               prioshared[priority] = data
913               priolist.append(data)
914           else:
915             priolist.append(data)
916
917     # Start all acquires and wait for them
918     first.set()
919     prev.wait()
920
921     # Check lock information
922     self.assertEqual(self.sl.GetLockInfo(set()),
923                      [(self.sl.name, None, None, None)])
924     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
925                      [(self.sl.name, "exclusive",
926                        [threading.currentThread().getName()], None)])
927
928     self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
929                             perprio)
930
931     # Let threads acquire the lock
932     self.sl.release()
933
934     # Wait for everything to finish
935     self._waitThreads()
936
937     self.assert_(self.sl._check_empty())
938
939     # Check acquires by priority
940     for acquires in [perprio[i] for i in sorted(perprio.keys())]:
941       for (_, names, _) in acquires:
942         # For shared acquires, the set will contain 1..n entries. For exclusive
943         # acquires only one.
944         while names:
945           names.remove(self.done.get_nowait())
946       self.assertFalse(compat.any(names for (_, names, _) in acquires))
947
948     self.assertRaises(Queue.Empty, self.done.get_nowait)
949
950   def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
951     self.assertEqual(name, self.sl.name)
952     self.assert_(mode is None)
953     self.assert_(owner is None)
954
955     self.assertEqual([(pendmode, sorted(waiting))
956                       for (pendmode, waiting) in pending],
957                      [(["exclusive", "shared"][int(bool(shared))],
958                        sorted(t.getName() for t in threads))
959                       for acquires in [perprio[i]
960                                        for i in sorted(perprio.keys())]
961                       for (shared, _, threads) in acquires])
962
963   class _FakeTimeForSpuriousNotifications:
964     def __init__(self, now, check_end):
965       self.now = now
966       self.check_end = check_end
967
968       # Deterministic random number generator
969       self.rnd = random.Random(15086)
970
971     def time(self):
972       # Advance time if the random number generator thinks so (this is to test
973       # multiple notifications without advancing the time)
974       if self.rnd.random() < 0.3:
975         self.now += self.rnd.random()
976
977       self.check_end(self.now)
978
979       return self.now
980
981   @_Repeat
982   def testAcquireTimeoutWithSpuriousNotifications(self):
983     ready = threading.Event()
984     locked = threading.Event()
985     req = Queue.Queue(0)
986
987     epoch = 4000.0
988     timeout = 60.0
989
990     def check_end(now):
991       self.assertFalse(locked.isSet())
992
993       # If we waited long enough (in virtual time), tell main thread to release
994       # lock, otherwise tell it to notify once more
995       req.put(now < (epoch + (timeout * 0.8)))
996
997     time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
998
999     sl = locking.SharedLock("test", _time_fn=time_fn)
1000
1001     # Acquire in exclusive mode
1002     sl.acquire(shared=0)
1003
1004     def fn():
1005       self.assertTrue(sl.acquire(shared=0, timeout=timeout,
1006                                  test_notify=ready.set))
1007       locked.set()
1008       sl.release()
1009       self.done.put("success")
1010
1011     # Start acquire with timeout and wait for it to be ready
1012     self._addThread(target=fn)
1013     ready.wait()
1014
1015     # The separate thread is now waiting to acquire the lock, so start sending
1016     # spurious notifications.
1017
1018     # Wait for separate thread to ask for another notification
1019     count = 0
1020     while req.get():
1021       # After sending the notification, the lock will take a short amount of
1022       # time to notice and to retrieve the current time
1023       sl._notify_topmost()
1024       count += 1
1025
1026     self.assertTrue(count > 100, "Not enough notifications were sent")
1027
1028     self.assertFalse(locked.isSet())
1029
1030     # Some notifications have been sent, now actually release the lock
1031     sl.release()
1032
1033     # Wait for lock to be acquired
1034     locked.wait()
1035
1036     self._waitThreads()
1037
1038     self.assertEqual(self.done.get_nowait(), "success")
1039     self.assertRaises(Queue.Empty, self.done.get_nowait)
1040
1041
1042 class TestSharedLockInCondition(_ThreadedTestCase):
1043   """SharedLock as a condition lock tests"""
1044
1045   def setUp(self):
1046     _ThreadedTestCase.setUp(self)
1047     self.sl = locking.SharedLock("TestSharedLockInCondition")
1048     self.setCondition()
1049
1050   def setCondition(self):
1051     self.cond = threading.Condition(self.sl)
1052
1053   def testKeepMode(self):
1054     self.cond.acquire(shared=1)
1055     self.assert_(self.sl.is_owned(shared=1))
1056     self.cond.wait(0)
1057     self.assert_(self.sl.is_owned(shared=1))
1058     self.cond.release()
1059     self.cond.acquire(shared=0)
1060     self.assert_(self.sl.is_owned(shared=0))
1061     self.cond.wait(0)
1062     self.assert_(self.sl.is_owned(shared=0))
1063     self.cond.release()
1064
1065
1066 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
1067   """SharedLock as a pipe condition lock tests"""
1068
1069   def setCondition(self):
1070     self.cond = locking.PipeCondition(self.sl)
1071
1072
1073 class TestSSynchronizedDecorator(_ThreadedTestCase):
1074   """Shared Lock Synchronized decorator test"""
1075
1076   def setUp(self):
1077     _ThreadedTestCase.setUp(self)
1078
1079   @locking.ssynchronized(_decoratorlock)
1080   def _doItExclusive(self):
1081     self.assert_(_decoratorlock.is_owned())
1082     self.done.put("EXC")
1083
1084   @locking.ssynchronized(_decoratorlock, shared=1)
1085   def _doItSharer(self):
1086     self.assert_(_decoratorlock.is_owned(shared=1))
1087     self.done.put("SHR")
1088
1089   def testDecoratedFunctions(self):
1090     self._doItExclusive()
1091     self.assertFalse(_decoratorlock.is_owned())
1092     self._doItSharer()
1093     self.assertFalse(_decoratorlock.is_owned())
1094
1095   def testSharersCanCoexist(self):
1096     _decoratorlock.acquire(shared=1)
1097     threading.Thread(target=self._doItSharer).start()
1098     self.assert_(self.done.get(True, 1))
1099     _decoratorlock.release()
1100
1101   @_Repeat
1102   def testExclusiveBlocksExclusive(self):
1103     _decoratorlock.acquire()
1104     self._addThread(target=self._doItExclusive)
1105     # give it a bit of time to check that it's not actually doing anything
1106     self.assertRaises(Queue.Empty, self.done.get_nowait)
1107     _decoratorlock.release()
1108     self._waitThreads()
1109     self.failUnlessEqual(self.done.get_nowait(), "EXC")
1110
1111   @_Repeat
1112   def testExclusiveBlocksSharer(self):
1113     _decoratorlock.acquire()
1114     self._addThread(target=self._doItSharer)
1115     self.assertRaises(Queue.Empty, self.done.get_nowait)
1116     _decoratorlock.release()
1117     self._waitThreads()
1118     self.failUnlessEqual(self.done.get_nowait(), "SHR")
1119
1120   @_Repeat
1121   def testSharerBlocksExclusive(self):
1122     _decoratorlock.acquire(shared=1)
1123     self._addThread(target=self._doItExclusive)
1124     self.assertRaises(Queue.Empty, self.done.get_nowait)
1125     _decoratorlock.release()
1126     self._waitThreads()
1127     self.failUnlessEqual(self.done.get_nowait(), "EXC")
1128
1129
1130 class TestLockSet(_ThreadedTestCase):
1131   """LockSet tests"""
1132
1133   def setUp(self):
1134     _ThreadedTestCase.setUp(self)
1135     self._setUpLS()
1136
1137   def _setUpLS(self):
1138     """Helper to (re)initialize the lock set"""
1139     self.resources = ["one", "two", "three"]
1140     self.ls = locking.LockSet(self.resources, "TestLockSet")
1141
1142   def testResources(self):
1143     self.assertEquals(self.ls._names(), set(self.resources))
1144     newls = locking.LockSet([], "TestLockSet.testResources")
1145     self.assertEquals(newls._names(), set())
1146
1147   def testCheckOwnedUnknown(self):
1148     self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1149     for shared in [-1, 0, 1, 6378, 24255]:
1150       self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1151                                            shared=shared))
1152
1153   def testCheckOwnedUnknownWhileHolding(self):
1154     self.assertFalse(self.ls.check_owned([]))
1155     self.ls.acquire("one", shared=1)
1156     self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1157     self.assertTrue(self.ls.check_owned("one", shared=1))
1158     self.assertFalse(self.ls.check_owned("one", shared=0))
1159     self.assertFalse(self.ls.check_owned(["one", "two"]))
1160     self.assertRaises(errors.LockError, self.ls.check_owned,
1161                       ["one", "nonexist"])
1162     self.assertRaises(errors.LockError, self.ls.check_owned, "")
1163     self.ls.release()
1164     self.assertFalse(self.ls.check_owned([]))
1165     self.assertFalse(self.ls.check_owned("one"))
1166
1167   def testAcquireRelease(self):
1168     self.assertFalse(self.ls.check_owned(self.ls._names()))
1169     self.assert_(self.ls.acquire("one"))
1170     self.assertEquals(self.ls.list_owned(), set(["one"]))
1171     self.assertTrue(self.ls.check_owned("one"))
1172     self.assertTrue(self.ls.check_owned("one", shared=0))
1173     self.assertFalse(self.ls.check_owned("one", shared=1))
1174     self.ls.release()
1175     self.assertEquals(self.ls.list_owned(), set())
1176     self.assertFalse(self.ls.check_owned(self.ls._names()))
1177     self.assertEquals(self.ls.acquire(["one"]), set(["one"]))
1178     self.assertEquals(self.ls.list_owned(), set(["one"]))
1179     self.ls.release()
1180     self.assertEquals(self.ls.list_owned(), set())
1181     self.ls.acquire(["one", "two", "three"])
1182     self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1183     self.assertTrue(self.ls.check_owned(self.ls._names()))
1184     self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1185     self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1186     self.ls.release("one")
1187     self.assertFalse(self.ls.check_owned(["one"]))
1188     self.assertTrue(self.ls.check_owned(["two", "three"]))
1189     self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1190     self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1191     self.assertEquals(self.ls.list_owned(), set(["two", "three"]))
1192     self.ls.release(["three"])
1193     self.assertEquals(self.ls.list_owned(), set(["two"]))
1194     self.ls.release()
1195     self.assertEquals(self.ls.list_owned(), set())
1196     self.assertEquals(self.ls.acquire(["one", "three"]), set(["one", "three"]))
1197     self.assertEquals(self.ls.list_owned(), set(["one", "three"]))
1198     self.ls.release()
1199     self.assertEquals(self.ls.list_owned(), set())
1200     for name in self.ls._names():
1201       self.assertFalse(self.ls.check_owned(name))
1202
1203   def testNoDoubleAcquire(self):
1204     self.ls.acquire("one")
1205     self.assertRaises(AssertionError, self.ls.acquire, "one")
1206     self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1207     self.assertRaises(AssertionError, self.ls.acquire, ["two", "three"])
1208     self.ls.release()
1209     self.ls.acquire(["one", "three"])
1210     self.ls.release("one")
1211     self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1212     self.ls.release("three")
1213
1214   def testNoWrongRelease(self):
1215     self.assertRaises(AssertionError, self.ls.release)
1216     self.ls.acquire("one")
1217     self.assertRaises(AssertionError, self.ls.release, "two")
1218
1219   def testAddRemove(self):
1220     self.ls.add("four")
1221     self.assertEquals(self.ls.list_owned(), set())
1222     self.assert_("four" in self.ls._names())
1223     self.ls.add(["five", "six", "seven"], acquired=1)
1224     self.assert_("five" in self.ls._names())
1225     self.assert_("six" in self.ls._names())
1226     self.assert_("seven" in self.ls._names())
1227     self.assertEquals(self.ls.list_owned(), set(["five", "six", "seven"]))
1228     self.assertEquals(self.ls.remove(["five", "six"]), ["five", "six"])
1229     self.assert_("five" not in self.ls._names())
1230     self.assert_("six" not in self.ls._names())
1231     self.assertEquals(self.ls.list_owned(), set(["seven"]))
1232     self.assertRaises(AssertionError, self.ls.add, "eight", acquired=1)
1233     self.ls.remove("seven")
1234     self.assert_("seven" not in self.ls._names())
1235     self.assertEquals(self.ls.list_owned(), set([]))
1236     self.ls.acquire(None, shared=1)
1237     self.assertRaises(AssertionError, self.ls.add, "eight")
1238     self.ls.release()
1239     self.ls.acquire(None)
1240     self.ls.add("eight", acquired=1)
1241     self.assert_("eight" in self.ls._names())
1242     self.assert_("eight" in self.ls.list_owned())
1243     self.ls.add("nine")
1244     self.assert_("nine" in self.ls._names())
1245     self.assert_("nine" not in self.ls.list_owned())
1246     self.ls.release()
1247     self.ls.remove(["two"])
1248     self.assert_("two" not in self.ls._names())
1249     self.ls.acquire("three")
1250     self.assertEquals(self.ls.remove(["three"]), ["three"])
1251     self.assert_("three" not in self.ls._names())
1252     self.assertEquals(self.ls.remove("three"), [])
1253     self.assertEquals(self.ls.remove(["one", "three", "six"]), ["one"])
1254     self.assert_("one" not in self.ls._names())
1255
1256   def testRemoveNonBlocking(self):
1257     self.ls.acquire("one")
1258     self.assertEquals(self.ls.remove("one"), ["one"])
1259     self.ls.acquire(["two", "three"])
1260     self.assertEquals(self.ls.remove(["two", "three"]),
1261                       ["two", "three"])
1262
1263   def testNoDoubleAdd(self):
1264     self.assertRaises(errors.LockError, self.ls.add, "two")
1265     self.ls.add("four")
1266     self.assertRaises(errors.LockError, self.ls.add, "four")
1267
1268   def testNoWrongRemoves(self):
1269     self.ls.acquire(["one", "three"], shared=1)
1270     # Cannot remove "two" while holding something which is not a superset
1271     self.assertRaises(AssertionError, self.ls.remove, "two")
1272     # Cannot remove "three" as we are sharing it
1273     self.assertRaises(AssertionError, self.ls.remove, "three")
1274
1275   def testAcquireSetLock(self):
1276     # acquire the set-lock exclusively
1277     self.assertEquals(self.ls.acquire(None), set(["one", "two", "three"]))
1278     self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1279     self.assertEquals(self.ls.is_owned(), True)
1280     self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1281     # I can still add/remove elements...
1282     self.assertEquals(self.ls.remove(["two", "three"]), ["two", "three"])
1283     self.assert_(self.ls.add("six"))
1284     self.ls.release()
1285     # share the set-lock
1286     self.assertEquals(self.ls.acquire(None, shared=1), set(["one", "six"]))
1287     # adding new elements is not possible
1288     self.assertRaises(AssertionError, self.ls.add, "five")
1289     self.ls.release()
1290
1291   def testAcquireWithRepetitions(self):
1292     self.assertEquals(self.ls.acquire(["two", "two", "three"], shared=1),
1293                       set(["two", "two", "three"]))
1294     self.ls.release(["two", "two"])
1295     self.assertEquals(self.ls.list_owned(), set(["three"]))
1296
1297   def testEmptyAcquire(self):
1298     # Acquire an empty list of locks...
1299     self.assertEquals(self.ls.acquire([]), set())
1300     self.assertEquals(self.ls.list_owned(), set())
1301     # New locks can still be addded
1302     self.assert_(self.ls.add("six"))
1303     # "re-acquiring" is not an issue, since we had really acquired nothing
1304     self.assertEquals(self.ls.acquire([], shared=1), set())
1305     self.assertEquals(self.ls.list_owned(), set())
1306     # We haven't really acquired anything, so we cannot release
1307     self.assertRaises(AssertionError, self.ls.release)
1308
1309   def _doLockSet(self, names, shared):
1310     try:
1311       self.ls.acquire(names, shared=shared)
1312       self.done.put("DONE")
1313       self.ls.release()
1314     except errors.LockError:
1315       self.done.put("ERR")
1316
1317   def _doAddSet(self, names):
1318     try:
1319       self.ls.add(names, acquired=1)
1320       self.done.put("DONE")
1321       self.ls.release()
1322     except errors.LockError:
1323       self.done.put("ERR")
1324
1325   def _doRemoveSet(self, names):
1326     self.done.put(self.ls.remove(names))
1327
1328   @_Repeat
1329   def testConcurrentSharedAcquire(self):
1330     self.ls.acquire(["one", "two"], shared=1)
1331     self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1332     self._waitThreads()
1333     self.assertEqual(self.done.get_nowait(), "DONE")
1334     self._addThread(target=self._doLockSet, args=(["one", "two", "three"], 1))
1335     self._waitThreads()
1336     self.assertEqual(self.done.get_nowait(), "DONE")
1337     self._addThread(target=self._doLockSet, args=("three", 1))
1338     self._waitThreads()
1339     self.assertEqual(self.done.get_nowait(), "DONE")
1340     self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1341     self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1342     self.assertRaises(Queue.Empty, self.done.get_nowait)
1343     self.ls.release()
1344     self._waitThreads()
1345     self.assertEqual(self.done.get_nowait(), "DONE")
1346     self.assertEqual(self.done.get_nowait(), "DONE")
1347
1348   @_Repeat
1349   def testConcurrentExclusiveAcquire(self):
1350     self.ls.acquire(["one", "two"])
1351     self._addThread(target=self._doLockSet, args=("three", 1))
1352     self._waitThreads()
1353     self.assertEqual(self.done.get_nowait(), "DONE")
1354     self._addThread(target=self._doLockSet, args=("three", 0))
1355     self._waitThreads()
1356     self.assertEqual(self.done.get_nowait(), "DONE")
1357     self.assertRaises(Queue.Empty, self.done.get_nowait)
1358     self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1359     self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1360     self._addThread(target=self._doLockSet, args=("one", 0))
1361     self._addThread(target=self._doLockSet, args=("one", 1))
1362     self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1363     self._addThread(target=self._doLockSet, args=(["two", "three"], 1))
1364     self.assertRaises(Queue.Empty, self.done.get_nowait)
1365     self.ls.release()
1366     self._waitThreads()
1367     for _ in range(6):
1368       self.failUnlessEqual(self.done.get_nowait(), "DONE")
1369
1370   @_Repeat
1371   def testSimpleAcquireTimeoutExpiring(self):
1372     names = sorted(self.ls._names())
1373     self.assert_(len(names) >= 3)
1374
1375     # Get name of first lock
1376     first = names[0]
1377
1378     # Get name of last lock
1379     last = names.pop()
1380
1381     checks = [
1382       # Block first and try to lock it again
1383       (first, first),
1384
1385       # Block last and try to lock all locks
1386       (None, first),
1387
1388       # Block last and try to lock it again
1389       (last, last),
1390       ]
1391
1392     for (wanted, block) in checks:
1393       # Lock in exclusive mode
1394       self.assert_(self.ls.acquire(block, shared=0))
1395
1396       def _AcquireOne():
1397         # Try to get the same lock again with a timeout (should never succeed)
1398         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1399         if acquired:
1400           self.done.put("acquired")
1401           self.ls.release()
1402         else:
1403           self.assert_(acquired is None)
1404           self.assertFalse(self.ls.list_owned())
1405           self.assertFalse(self.ls.is_owned())
1406           self.done.put("not acquired")
1407
1408       self._addThread(target=_AcquireOne)
1409
1410       # Wait for timeout in thread to expire
1411       self._waitThreads()
1412
1413       # Release exclusive lock again
1414       self.ls.release()
1415
1416       self.assertEqual(self.done.get_nowait(), "not acquired")
1417       self.assertRaises(Queue.Empty, self.done.get_nowait)
1418
1419   @_Repeat
1420   def testDelayedAndExpiringLockAcquire(self):
1421     self._setUpLS()
1422     self.ls.add(["five", "six", "seven", "eight", "nine"])
1423
1424     for expire in (False, True):
1425       names = sorted(self.ls._names())
1426       self.assertEqual(len(names), 8)
1427
1428       lock_ev = dict([(i, threading.Event()) for i in names])
1429
1430       # Lock all in exclusive mode
1431       self.assert_(self.ls.acquire(names, shared=0))
1432
1433       if expire:
1434         # We'll wait at least 300ms per lock
1435         lockwait = len(names) * [0.3]
1436
1437         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1438         # this gives us up to 2.4s to fail.
1439         lockall_timeout = 0.4
1440       else:
1441         # This should finish rather quickly
1442         lockwait = None
1443         lockall_timeout = len(names) * 5.0
1444
1445       def _LockAll():
1446         def acquire_notification(name):
1447           if not expire:
1448             self.done.put("getting %s" % name)
1449
1450           # Kick next lock
1451           lock_ev[name].set()
1452
1453         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1454                            test_notify=acquire_notification):
1455           self.done.put("got all")
1456           self.ls.release()
1457         else:
1458           self.done.put("timeout on all")
1459
1460         # Notify all locks
1461         for ev in lock_ev.values():
1462           ev.set()
1463
1464       t = self._addThread(target=_LockAll)
1465
1466       for idx, name in enumerate(names):
1467         # Wait for actual acquire on this lock to start
1468         lock_ev[name].wait(10.0)
1469
1470         if expire and t.isAlive():
1471           # Wait some time after getting the notification to make sure the lock
1472           # acquire will expire
1473           SafeSleep(lockwait[idx])
1474
1475         self.ls.release(names=name)
1476
1477       self.assertFalse(self.ls.list_owned())
1478
1479       self._waitThreads()
1480
1481       if expire:
1482         # Not checking which locks were actually acquired. Doing so would be
1483         # too timing-dependant.
1484         self.assertEqual(self.done.get_nowait(), "timeout on all")
1485       else:
1486         for i in names:
1487           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1488         self.assertEqual(self.done.get_nowait(), "got all")
1489       self.assertRaises(Queue.Empty, self.done.get_nowait)
1490
1491   @_Repeat
1492   def testConcurrentRemove(self):
1493     self.ls.add("four")
1494     self.ls.acquire(["one", "two", "four"])
1495     self._addThread(target=self._doLockSet, args=(["one", "four"], 0))
1496     self._addThread(target=self._doLockSet, args=(["one", "four"], 1))
1497     self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1498     self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1499     self.assertRaises(Queue.Empty, self.done.get_nowait)
1500     self.ls.remove("one")
1501     self.ls.release()
1502     self._waitThreads()
1503     for i in range(4):
1504       self.failUnlessEqual(self.done.get_nowait(), "ERR")
1505     self.ls.add(["five", "six"], acquired=1)
1506     self._addThread(target=self._doLockSet, args=(["three", "six"], 1))
1507     self._addThread(target=self._doLockSet, args=(["three", "six"], 0))
1508     self._addThread(target=self._doLockSet, args=(["four", "six"], 1))
1509     self._addThread(target=self._doLockSet, args=(["four", "six"], 0))
1510     self.ls.remove("five")
1511     self.ls.release()
1512     self._waitThreads()
1513     for i in range(4):
1514       self.failUnlessEqual(self.done.get_nowait(), "DONE")
1515     self.ls.acquire(["three", "four"])
1516     self._addThread(target=self._doRemoveSet, args=(["four", "six"], ))
1517     self.assertRaises(Queue.Empty, self.done.get_nowait)
1518     self.ls.remove("four")
1519     self._waitThreads()
1520     self.assertEqual(self.done.get_nowait(), ["six"])
1521     self._addThread(target=self._doRemoveSet, args=(["two"]))
1522     self._waitThreads()
1523     self.assertEqual(self.done.get_nowait(), ["two"])
1524     self.ls.release()
1525     # reset lockset
1526     self._setUpLS()
1527
1528   @_Repeat
1529   def testConcurrentSharedSetLock(self):
1530     # share the set-lock...
1531     self.ls.acquire(None, shared=1)
1532     # ...another thread can share it too
1533     self._addThread(target=self._doLockSet, args=(None, 1))
1534     self._waitThreads()
1535     self.assertEqual(self.done.get_nowait(), "DONE")
1536     # ...or just share some elements
1537     self._addThread(target=self._doLockSet, args=(["one", "three"], 1))
1538     self._waitThreads()
1539     self.assertEqual(self.done.get_nowait(), "DONE")
1540     # ...but not add new ones or remove any
1541     t = self._addThread(target=self._doAddSet, args=(["nine"]))
1542     self._addThread(target=self._doRemoveSet, args=(["two"], ))
1543     self.assertRaises(Queue.Empty, self.done.get_nowait)
1544     # this just releases the set-lock
1545     self.ls.release([])
1546     t.join(60)
1547     self.assertEqual(self.done.get_nowait(), "DONE")
1548     # release the lock on the actual elements so remove() can proceed too
1549     self.ls.release()
1550     self._waitThreads()
1551     self.failUnlessEqual(self.done.get_nowait(), ["two"])
1552     # reset lockset
1553     self._setUpLS()
1554
1555   @_Repeat
1556   def testConcurrentExclusiveSetLock(self):
1557     # acquire the set-lock...
1558     self.ls.acquire(None, shared=0)
1559     # ...no one can do anything else
1560     self._addThread(target=self._doLockSet, args=(None, 1))
1561     self._addThread(target=self._doLockSet, args=(None, 0))
1562     self._addThread(target=self._doLockSet, args=(["three"], 0))
1563     self._addThread(target=self._doLockSet, args=(["two"], 1))
1564     self._addThread(target=self._doAddSet, args=(["nine"]))
1565     self.assertRaises(Queue.Empty, self.done.get_nowait)
1566     self.ls.release()
1567     self._waitThreads()
1568     for _ in range(5):
1569       self.assertEqual(self.done.get(True, 1), "DONE")
1570     # cleanup
1571     self._setUpLS()
1572
1573   @_Repeat
1574   def testConcurrentSetLockAdd(self):
1575     self.ls.acquire("one")
1576     # Another thread wants the whole SetLock
1577     self._addThread(target=self._doLockSet, args=(None, 0))
1578     self._addThread(target=self._doLockSet, args=(None, 1))
1579     self.assertRaises(Queue.Empty, self.done.get_nowait)
1580     self.assertRaises(AssertionError, self.ls.add, "four")
1581     self.ls.release()
1582     self._waitThreads()
1583     self.assertEqual(self.done.get_nowait(), "DONE")
1584     self.assertEqual(self.done.get_nowait(), "DONE")
1585     self.ls.acquire(None)
1586     self._addThread(target=self._doLockSet, args=(None, 0))
1587     self._addThread(target=self._doLockSet, args=(None, 1))
1588     self.assertRaises(Queue.Empty, self.done.get_nowait)
1589     self.ls.add("four")
1590     self.ls.add("five", acquired=1)
1591     self.ls.add("six", acquired=1, shared=1)
1592     self.assertEquals(self.ls.list_owned(),
1593       set(["one", "two", "three", "five", "six"]))
1594     self.assertEquals(self.ls.is_owned(), True)
1595     self.assertEquals(self.ls._names(),
1596       set(["one", "two", "three", "four", "five", "six"]))
1597     self.ls.release()
1598     self._waitThreads()
1599     self.assertEqual(self.done.get_nowait(), "DONE")
1600     self.assertEqual(self.done.get_nowait(), "DONE")
1601     self._setUpLS()
1602
1603   @_Repeat
1604   def testEmptyLockSet(self):
1605     # get the set-lock
1606     self.assertEqual(self.ls.acquire(None), set(["one", "two", "three"]))
1607     # now empty it...
1608     self.ls.remove(["one", "two", "three"])
1609     self.assertFalse(self.ls._names())
1610     # and adds/locks by another thread still wait
1611     self._addThread(target=self._doAddSet, args=(["nine"]))
1612     self._addThread(target=self._doLockSet, args=(None, 1))
1613     self._addThread(target=self._doLockSet, args=(None, 0))
1614     self.assertRaises(Queue.Empty, self.done.get_nowait)
1615     self.ls.release()
1616     self._waitThreads()
1617     for _ in range(3):
1618       self.assertEqual(self.done.get_nowait(), "DONE")
1619     # empty it again...
1620     self.assertEqual(self.ls.remove(["nine"]), ["nine"])
1621     # now share it...
1622     self.assertEqual(self.ls.acquire(None, shared=1), set())
1623     # other sharers can go, adds still wait
1624     self._addThread(target=self._doLockSet, args=(None, 1))
1625     self._waitThreads()
1626     self.assertEqual(self.done.get_nowait(), "DONE")
1627     self._addThread(target=self._doAddSet, args=(["nine"]))
1628     self.assertRaises(Queue.Empty, self.done.get_nowait)
1629     self.ls.release()
1630     self._waitThreads()
1631     self.assertEqual(self.done.get_nowait(), "DONE")
1632     self._setUpLS()
1633
1634   def testAcquireWithNamesDowngrade(self):
1635     self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1636     self.assertTrue(self.ls.is_owned())
1637     self.assertFalse(self.ls._get_lock().is_owned())
1638     self.ls.release()
1639     self.assertFalse(self.ls.is_owned())
1640     self.assertFalse(self.ls._get_lock().is_owned())
1641     # Can't downgrade after releasing
1642     self.assertRaises(AssertionError, self.ls.downgrade, "two")
1643
1644   def testDowngrade(self):
1645     # Not owning anything, must raise an exception
1646     self.assertFalse(self.ls.is_owned())
1647     self.assertRaises(AssertionError, self.ls.downgrade)
1648
1649     self.assertFalse(compat.any(i.is_owned()
1650                                 for i in self.ls._get_lockdict().values()))
1651     self.assertFalse(self.ls.check_owned(self.ls._names()))
1652     for name in self.ls._names():
1653       self.assertFalse(self.ls.check_owned(name))
1654
1655     self.assertEquals(self.ls.acquire(None, shared=0),
1656                       set(["one", "two", "three"]))
1657     self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1658
1659     self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1660     for name in self.ls._names():
1661       self.assertTrue(self.ls.check_owned(name))
1662       self.assertTrue(self.ls.check_owned(name, shared=0))
1663       self.assertFalse(self.ls.check_owned(name, shared=1))
1664
1665     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1666     self.assertTrue(compat.all(i.is_owned(shared=0)
1667                                for i in self.ls._get_lockdict().values()))
1668
1669     # Start downgrading locks
1670     self.assertTrue(self.ls.downgrade(names=["one"]))
1671     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1672     self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1673                                for name, lock in
1674                                  self.ls._get_lockdict().items()))
1675
1676     self.assertFalse(self.ls.check_owned("one", shared=0))
1677     self.assertTrue(self.ls.check_owned("one", shared=1))
1678     self.assertTrue(self.ls.check_owned("two", shared=0))
1679     self.assertTrue(self.ls.check_owned("three", shared=0))
1680
1681     # Downgrade second lock
1682     self.assertTrue(self.ls.downgrade(names="two"))
1683     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1684     should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1685     self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1686                                for name, lock in
1687                                  self.ls._get_lockdict().items()))
1688
1689     self.assertFalse(self.ls.check_owned("one", shared=0))
1690     self.assertTrue(self.ls.check_owned("one", shared=1))
1691     self.assertFalse(self.ls.check_owned("two", shared=0))
1692     self.assertTrue(self.ls.check_owned("two", shared=1))
1693     self.assertTrue(self.ls.check_owned("three", shared=0))
1694
1695     # Downgrading the last exclusive lock to shared must downgrade the
1696     # lockset-internal lock too
1697     self.assertTrue(self.ls.downgrade(names="three"))
1698     self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1699     self.assertTrue(compat.all(i.is_owned(shared=1)
1700                                for i in self.ls._get_lockdict().values()))
1701
1702     # Verify owned locks
1703     for name in self.ls._names():
1704       self.assertTrue(self.ls.check_owned(name, shared=1))
1705
1706     # Downgrading a shared lock must be a no-op
1707     self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1708     self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1709     self.assertTrue(compat.all(i.is_owned(shared=1)
1710                                for i in self.ls._get_lockdict().values()))
1711
1712     self.ls.release()
1713
1714   def testDowngradeEverything(self):
1715     self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0),
1716                      set(["one", "two", "three"]))
1717     self.assertTrue(self.ls.owning_all())
1718
1719     # Ensure all locks are now owned in exclusive mode
1720     for name in self.ls._names():
1721       self.assertTrue(self.ls.check_owned(name, shared=0))
1722
1723     # Downgrade everything
1724     self.assertTrue(self.ls.downgrade())
1725
1726     # Ensure all locks are now owned in shared mode
1727     for name in self.ls._names():
1728       self.assertTrue(self.ls.check_owned(name, shared=1))
1729
1730     self.assertTrue(self.ls.owning_all())
1731
1732   def testPriority(self):
1733     def _Acquire(prev, next, name, priority, success_fn):
1734       prev.wait()
1735       self.assert_(self.ls.acquire(name, shared=0,
1736                                    priority=priority,
1737                                    test_notify=lambda _: next.set()))
1738       try:
1739         success_fn()
1740       finally:
1741         self.ls.release()
1742
1743     # Get all in exclusive mode
1744     self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1745
1746     done_two = Queue.Queue(0)
1747
1748     first = threading.Event()
1749     prev = first
1750
1751     acquires = [("one", prio, self.done) for prio in range(1, 33)]
1752     acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1753
1754     # Use a deterministic random generator
1755     random.Random(741).shuffle(acquires)
1756
1757     for (name, prio, done) in acquires:
1758       ev = threading.Event()
1759       self._addThread(target=_Acquire,
1760                       args=(prev, ev, name, prio,
1761                             compat.partial(done.put, "Prio%s" % prio)))
1762       prev = ev
1763
1764     # Start acquires
1765     first.set()
1766
1767     # Wait for last acquire to start
1768     prev.wait()
1769
1770     # Let threads acquire locks
1771     self.ls.release()
1772
1773     # Wait for threads to finish
1774     self._waitThreads()
1775
1776     for i in range(1, 33):
1777       self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1778       self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1779
1780     self.assertRaises(Queue.Empty, self.done.get_nowait)
1781     self.assertRaises(Queue.Empty, done_two.get_nowait)
1782
1783
1784 class TestGanetiLockManager(_ThreadedTestCase):
1785   def setUp(self):
1786     _ThreadedTestCase.setUp(self)
1787     self.nodes = ["n1", "n2"]
1788     self.nodegroups = ["g1", "g2"]
1789     self.instances = ["i1", "i2", "i3"]
1790     self.networks = ["net1", "net2", "net3"]
1791     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1792                                         self.instances, self.networks)
1793
1794   def tearDown(self):
1795     # Don't try this at home...
1796     locking.GanetiLockManager._instance = None
1797
1798   def testLockingConstants(self):
1799     # The locking library internally cheats by assuming its constants have some
1800     # relationships with each other. Check those hold true.
1801     # This relationship is also used in the Processor to recursively acquire
1802     # the right locks. Again, please don't break it.
1803     for i in range(len(locking.LEVELS)):
1804       self.assertEqual(i, locking.LEVELS[i])
1805
1806   def testDoubleGLFails(self):
1807     self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
1808
1809   def testLockNames(self):
1810     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1811     self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1812     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1813     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1814                      set(self.nodegroups))
1815     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1816                      set(self.instances))
1817     self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
1818                      set(self.networks))
1819
1820   def testInitAndResources(self):
1821     locking.GanetiLockManager._instance = None
1822     self.GL = locking.GanetiLockManager([], [], [], [])
1823     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1824     self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1825     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1826     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1827     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1828     self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
1829
1830     locking.GanetiLockManager._instance = None
1831     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
1832     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1833     self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1834     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1835     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1836                                     set(self.nodegroups))
1837     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1838     self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
1839
1840     locking.GanetiLockManager._instance = None
1841     self.GL = locking.GanetiLockManager([], [], self.instances, [])
1842     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1843     self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1844     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1845     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1846     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1847                      set(self.instances))
1848
1849     locking.GanetiLockManager._instance = None
1850     self.GL = locking.GanetiLockManager([], [], [], self.networks)
1851     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1852     self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1853     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1854     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1855     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1856     self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
1857                      set(self.networks))
1858
1859   def testAcquireRelease(self):
1860     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1861     self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(["BGL"]))
1862     self.GL.acquire(locking.LEVEL_INSTANCE, ["i1"])
1863     self.GL.acquire(locking.LEVEL_NODEGROUP, ["g2"])
1864     self.GL.acquire(locking.LEVEL_NODE, ["n1", "n2"], shared=1)
1865     self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
1866                                         shared=1))
1867     self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
1868     self.GL.release(locking.LEVEL_NODE, ["n2"])
1869     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(["n1"]))
1870     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
1871     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
1872     self.GL.release(locking.LEVEL_NODE)
1873     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
1874     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
1875     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
1876     self.GL.release(locking.LEVEL_NODEGROUP)
1877     self.GL.release(locking.LEVEL_INSTANCE)
1878     self.assertRaises(errors.LockError, self.GL.acquire,
1879                       locking.LEVEL_INSTANCE, ["i5"])
1880     self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"], shared=1)
1881     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i3"]))
1882
1883   def testAcquireWholeSets(self):
1884     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1885     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1886                       set(self.instances))
1887     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1888                       set(self.instances))
1889     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1890                       set(self.nodegroups))
1891     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
1892                       set(self.nodegroups))
1893     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1894                       set(self.nodes))
1895     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1896                       set(self.nodes))
1897     self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
1898     self.assertTrue(self.GL.owning_all(locking.LEVEL_NODEGROUP))
1899     self.assertTrue(self.GL.owning_all(locking.LEVEL_NODE))
1900     self.GL.release(locking.LEVEL_NODE)
1901     self.GL.release(locking.LEVEL_NODEGROUP)
1902     self.GL.release(locking.LEVEL_INSTANCE)
1903     self.GL.release(locking.LEVEL_CLUSTER)
1904
1905   def testAcquireWholeAndPartial(self):
1906     self.assertFalse(self.GL.owning_all(locking.LEVEL_INSTANCE))
1907     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1908     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1909                       set(self.instances))
1910     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1911                       set(self.instances))
1912     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ["n2"], shared=1),
1913                       set(["n2"]))
1914     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1915                       set(["n2"]))
1916     self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
1917     self.assertFalse(self.GL.owning_all(locking.LEVEL_NODE))
1918     self.GL.release(locking.LEVEL_NODE)
1919     self.GL.release(locking.LEVEL_INSTANCE)
1920     self.GL.release(locking.LEVEL_CLUSTER)
1921
1922   def testBGLDependency(self):
1923     self.assertRaises(AssertionError, self.GL.acquire,
1924                       locking.LEVEL_NODE, ["n1", "n2"])
1925     self.assertRaises(AssertionError, self.GL.acquire,
1926                       locking.LEVEL_INSTANCE, ["i3"])
1927     self.assertRaises(AssertionError, self.GL.acquire,
1928                       locking.LEVEL_NODEGROUP, ["g1"])
1929     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1930     self.GL.acquire(locking.LEVEL_NODE, ["n1"])
1931     self.assertRaises(AssertionError, self.GL.release,
1932                       locking.LEVEL_CLUSTER, ["BGL"])
1933     self.assertRaises(AssertionError, self.GL.release,
1934                       locking.LEVEL_CLUSTER)
1935     self.GL.release(locking.LEVEL_NODE)
1936     self.GL.acquire(locking.LEVEL_INSTANCE, ["i1", "i2"])
1937     self.assertRaises(AssertionError, self.GL.release,
1938                       locking.LEVEL_CLUSTER, ["BGL"])
1939     self.assertRaises(AssertionError, self.GL.release,
1940                       locking.LEVEL_CLUSTER)
1941     self.GL.release(locking.LEVEL_INSTANCE)
1942     self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1943     self.GL.release(locking.LEVEL_NODEGROUP, ["g1"])
1944     self.assertRaises(AssertionError, self.GL.release,
1945                       locking.LEVEL_CLUSTER, ["BGL"])
1946     self.assertRaises(AssertionError, self.GL.release,
1947                       locking.LEVEL_CLUSTER)
1948     self.GL.release(locking.LEVEL_NODEGROUP)
1949     self.GL.release(locking.LEVEL_CLUSTER)
1950
1951   def testWrongOrder(self):
1952     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1953     self.GL.acquire(locking.LEVEL_NODE, ["n2"])
1954     self.assertRaises(AssertionError, self.GL.acquire,
1955                       locking.LEVEL_NODE, ["n1"])
1956     self.assertRaises(AssertionError, self.GL.acquire,
1957                       locking.LEVEL_NODEGROUP, ["g1"])
1958     self.assertRaises(AssertionError, self.GL.acquire,
1959                       locking.LEVEL_INSTANCE, ["i2"])
1960
1961   def testModifiableLevels(self):
1962     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1963                       ["BGL2"])
1964     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_NODE_ALLOC,
1965                       ["NAL2"])
1966     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"])
1967     self.GL.add(locking.LEVEL_INSTANCE, ["i4"])
1968     self.GL.remove(locking.LEVEL_INSTANCE, ["i3"])
1969     self.GL.remove(locking.LEVEL_INSTANCE, ["i1"])
1970     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(["i2", "i4"]))
1971     self.GL.add(locking.LEVEL_NODE, ["n3"])
1972     self.GL.remove(locking.LEVEL_NODE, ["n1"])
1973     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(["n2", "n3"]))
1974     self.GL.add(locking.LEVEL_NODEGROUP, ["g3"])
1975     self.GL.remove(locking.LEVEL_NODEGROUP, ["g2"])
1976     self.GL.remove(locking.LEVEL_NODEGROUP, ["g1"])
1977     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(["g3"]))
1978     self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1979                       ["BGL2"])
1980
1981   # Helper function to run as a thread that shared the BGL and then acquires
1982   # some locks at another level.
1983   def _doLock(self, level, names, shared):
1984     try:
1985       self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1986       self.GL.acquire(level, names, shared=shared)
1987       self.done.put("DONE")
1988       self.GL.release(level)
1989       self.GL.release(locking.LEVEL_CLUSTER)
1990     except errors.LockError:
1991       self.done.put("ERR")
1992
1993   @_Repeat
1994   def testConcurrency(self):
1995     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1996     self._addThread(target=self._doLock,
1997                     args=(locking.LEVEL_INSTANCE, "i1", 1))
1998     self._waitThreads()
1999     self.assertEqual(self.done.get_nowait(), "DONE")
2000     self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"])
2001     self._addThread(target=self._doLock,
2002                     args=(locking.LEVEL_INSTANCE, "i1", 1))
2003     self._waitThreads()
2004     self.assertEqual(self.done.get_nowait(), "DONE")
2005     self._addThread(target=self._doLock,
2006                     args=(locking.LEVEL_INSTANCE, "i3", 1))
2007     self.assertRaises(Queue.Empty, self.done.get_nowait)
2008     self.GL.release(locking.LEVEL_INSTANCE)
2009     self._waitThreads()
2010     self.assertEqual(self.done.get_nowait(), "DONE")
2011     self.GL.acquire(locking.LEVEL_INSTANCE, ["i2"], shared=1)
2012     self._addThread(target=self._doLock,
2013                     args=(locking.LEVEL_INSTANCE, "i2", 1))
2014     self._waitThreads()
2015     self.assertEqual(self.done.get_nowait(), "DONE")
2016     self._addThread(target=self._doLock,
2017                     args=(locking.LEVEL_INSTANCE, "i2", 0))
2018     self.assertRaises(Queue.Empty, self.done.get_nowait)
2019     self.GL.release(locking.LEVEL_INSTANCE)
2020     self._waitThreads()
2021     self.assertEqual(self.done.get(True, 1), "DONE")
2022     self.GL.release(locking.LEVEL_CLUSTER, ["BGL"])
2023
2024
2025 class TestLockMonitor(_ThreadedTestCase):
2026   def setUp(self):
2027     _ThreadedTestCase.setUp(self)
2028     self.lm = locking.LockMonitor()
2029
2030   def testSingleThread(self):
2031     locks = []
2032
2033     for i in range(100):
2034       name = "TestLock%s" % i
2035       locks.append(locking.SharedLock(name, monitor=self.lm))
2036
2037     self.assertEqual(len(self.lm._locks), len(locks))
2038     result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
2039     self.assertEqual(len(result.fields), 1)
2040     self.assertEqual(len(result.data), 100)
2041
2042     # Delete all locks
2043     del locks[:]
2044
2045     # The garbage collector might needs some time
2046     def _CheckLocks():
2047       if self.lm._locks:
2048         raise utils.RetryAgain()
2049
2050     utils.Retry(_CheckLocks, 0.1, 30.0)
2051
2052     self.assertFalse(self.lm._locks)
2053
2054   def testMultiThread(self):
2055     locks = []
2056
2057     def _CreateLock(prev, next, name):
2058       prev.wait()
2059       locks.append(locking.SharedLock(name, monitor=self.lm))
2060       if next:
2061         next.set()
2062
2063     expnames = []
2064
2065     first = threading.Event()
2066     prev = first
2067
2068     # Use a deterministic random generator
2069     for i in random.Random(4263).sample(range(100), 33):
2070       name = "MtTestLock%s" % i
2071       expnames.append(name)
2072
2073       ev = threading.Event()
2074       self._addThread(target=_CreateLock, args=(prev, ev, name))
2075       prev = ev
2076
2077     # Add locks
2078     first.set()
2079     self._waitThreads()
2080
2081     # Check order in which locks were added
2082     self.assertEqual([i.name for i in locks], expnames)
2083
2084     # Check query result
2085     result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2086     self.assert_(isinstance(result, dict))
2087     response = objects.QueryResponse.FromDict(result)
2088     self.assertEqual(response.data,
2089                      [[(constants.RS_NORMAL, name),
2090                        (constants.RS_NORMAL, None),
2091                        (constants.RS_NORMAL, None),
2092                        (constants.RS_NORMAL, [])]
2093                       for name in utils.NiceSort(expnames)])
2094     self.assertEqual(len(response.fields), 4)
2095     self.assertEqual(["name", "mode", "owner", "pending"],
2096                      [fdef.name for fdef in response.fields])
2097
2098     # Test exclusive acquire
2099     for tlock in locks[::4]:
2100       tlock.acquire(shared=0)
2101       try:
2102         def _GetExpResult(name):
2103           if tlock.name == name:
2104             return [(constants.RS_NORMAL, name),
2105                     (constants.RS_NORMAL, "exclusive"),
2106                     (constants.RS_NORMAL,
2107                      [threading.currentThread().getName()]),
2108                     (constants.RS_NORMAL, [])]
2109           return [(constants.RS_NORMAL, name),
2110                   (constants.RS_NORMAL, None),
2111                   (constants.RS_NORMAL, None),
2112                   (constants.RS_NORMAL, [])]
2113
2114         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2115         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2116                          [_GetExpResult(name)
2117                           for name in utils.NiceSort(expnames)])
2118       finally:
2119         tlock.release()
2120
2121     # Test shared acquire
2122     def _Acquire(lock, shared, ev, notify):
2123       lock.acquire(shared=shared)
2124       try:
2125         notify.set()
2126         ev.wait()
2127       finally:
2128         lock.release()
2129
2130     for tlock1 in locks[::11]:
2131       for tlock2 in locks[::-15]:
2132         if tlock2 == tlock1:
2133           # Avoid deadlocks
2134           continue
2135
2136         for tlock3 in locks[::10]:
2137           if tlock3 in (tlock2, tlock1):
2138             # Avoid deadlocks
2139             continue
2140
2141           releaseev = threading.Event()
2142
2143           # Acquire locks
2144           acquireev = []
2145           tthreads1 = []
2146           for i in range(3):
2147             ev = threading.Event()
2148             tthreads1.append(self._addThread(target=_Acquire,
2149                                              args=(tlock1, 1, releaseev, ev)))
2150             acquireev.append(ev)
2151
2152           ev = threading.Event()
2153           tthread2 = self._addThread(target=_Acquire,
2154                                      args=(tlock2, 1, releaseev, ev))
2155           acquireev.append(ev)
2156
2157           ev = threading.Event()
2158           tthread3 = self._addThread(target=_Acquire,
2159                                      args=(tlock3, 0, releaseev, ev))
2160           acquireev.append(ev)
2161
2162           # Wait for all locks to be acquired
2163           for i in acquireev:
2164             i.wait()
2165
2166           # Check query result
2167           result = self.lm.QueryLocks(["name", "mode", "owner"])
2168           response = objects.QueryResponse.FromDict(result)
2169           for (name, mode, owner) in response.data:
2170             (name_status, name_value) = name
2171             (owner_status, owner_value) = owner
2172
2173             self.assertEqual(name_status, constants.RS_NORMAL)
2174             self.assertEqual(owner_status, constants.RS_NORMAL)
2175
2176             if name_value == tlock1.name:
2177               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2178               self.assertEqual(set(owner_value),
2179                                set(i.getName() for i in tthreads1))
2180               continue
2181
2182             if name_value == tlock2.name:
2183               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2184               self.assertEqual(owner_value, [tthread2.getName()])
2185               continue
2186
2187             if name_value == tlock3.name:
2188               self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2189               self.assertEqual(owner_value, [tthread3.getName()])
2190               continue
2191
2192             self.assert_(name_value in expnames)
2193             self.assertEqual(mode, (constants.RS_NORMAL, None))
2194             self.assert_(owner_value is None)
2195
2196           # Release locks again
2197           releaseev.set()
2198
2199           self._waitThreads()
2200
2201           result = self.lm.QueryLocks(["name", "mode", "owner"])
2202           self.assertEqual(objects.QueryResponse.FromDict(result).data,
2203                            [[(constants.RS_NORMAL, name),
2204                              (constants.RS_NORMAL, None),
2205                              (constants.RS_NORMAL, None)]
2206                             for name in utils.NiceSort(expnames)])
2207
2208   def testDelete(self):
2209     lock = locking.SharedLock("TestLock", monitor=self.lm)
2210
2211     self.assertEqual(len(self.lm._locks), 1)
2212     result = self.lm.QueryLocks(["name", "mode", "owner"])
2213     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2214                      [[(constants.RS_NORMAL, lock.name),
2215                        (constants.RS_NORMAL, None),
2216                        (constants.RS_NORMAL, None)]])
2217
2218     lock.delete()
2219
2220     result = self.lm.QueryLocks(["name", "mode", "owner"])
2221     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2222                      [[(constants.RS_NORMAL, lock.name),
2223                        (constants.RS_NORMAL, "deleted"),
2224                        (constants.RS_NORMAL, None)]])
2225     self.assertEqual(len(self.lm._locks), 1)
2226
2227   def testPending(self):
2228     def _Acquire(lock, shared, prev, next):
2229       prev.wait()
2230
2231       lock.acquire(shared=shared, test_notify=next.set)
2232       try:
2233         pass
2234       finally:
2235         lock.release()
2236
2237     lock = locking.SharedLock("ExcLock", monitor=self.lm)
2238
2239     for shared in [0, 1]:
2240       lock.acquire()
2241       try:
2242         self.assertEqual(len(self.lm._locks), 1)
2243         result = self.lm.QueryLocks(["name", "mode", "owner"])
2244         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2245                          [[(constants.RS_NORMAL, lock.name),
2246                            (constants.RS_NORMAL, "exclusive"),
2247                            (constants.RS_NORMAL,
2248                             [threading.currentThread().getName()])]])
2249
2250         threads = []
2251
2252         first = threading.Event()
2253         prev = first
2254
2255         for i in range(5):
2256           ev = threading.Event()
2257           threads.append(self._addThread(target=_Acquire,
2258                                           args=(lock, shared, prev, ev)))
2259           prev = ev
2260
2261         # Start acquires
2262         first.set()
2263
2264         # Wait for last acquire to start waiting
2265         prev.wait()
2266
2267         # NOTE: This works only because QueryLocks will acquire the
2268         # lock-internal lock again and won't be able to get the information
2269         # until it has the lock. By then the acquire should be registered in
2270         # SharedLock.__pending (otherwise it's a bug).
2271
2272         # All acquires are waiting now
2273         if shared:
2274           pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2275         else:
2276           pending = [("exclusive", [t.getName()]) for t in threads]
2277
2278         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2279         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2280                          [[(constants.RS_NORMAL, lock.name),
2281                            (constants.RS_NORMAL, "exclusive"),
2282                            (constants.RS_NORMAL,
2283                             [threading.currentThread().getName()]),
2284                            (constants.RS_NORMAL, pending)]])
2285
2286         self.assertEqual(len(self.lm._locks), 1)
2287       finally:
2288         lock.release()
2289
2290       self._waitThreads()
2291
2292       # No pending acquires
2293       result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2294       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2295                        [[(constants.RS_NORMAL, lock.name),
2296                          (constants.RS_NORMAL, None),
2297                          (constants.RS_NORMAL, None),
2298                          (constants.RS_NORMAL, [])]])
2299
2300       self.assertEqual(len(self.lm._locks), 1)
2301
2302   def testDeleteAndRecreate(self):
2303     lname = "TestLock101923193"
2304
2305     # Create some locks with the same name and keep all references
2306     locks = [locking.SharedLock(lname, monitor=self.lm)
2307              for _ in range(5)]
2308
2309     self.assertEqual(len(self.lm._locks), len(locks))
2310
2311     result = self.lm.QueryLocks(["name", "mode", "owner"])
2312     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2313                      [[(constants.RS_NORMAL, lname),
2314                        (constants.RS_NORMAL, None),
2315                        (constants.RS_NORMAL, None)]] * 5)
2316
2317     locks[2].delete()
2318
2319     # Check information order
2320     result = self.lm.QueryLocks(["name", "mode", "owner"])
2321     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2322                      [[(constants.RS_NORMAL, lname),
2323                        (constants.RS_NORMAL, None),
2324                        (constants.RS_NORMAL, None)]] * 2 +
2325                      [[(constants.RS_NORMAL, lname),
2326                        (constants.RS_NORMAL, "deleted"),
2327                        (constants.RS_NORMAL, None)]] +
2328                      [[(constants.RS_NORMAL, lname),
2329                        (constants.RS_NORMAL, None),
2330                        (constants.RS_NORMAL, None)]] * 2)
2331
2332     locks[1].acquire(shared=0)
2333
2334     last_status = [
2335       [(constants.RS_NORMAL, lname),
2336        (constants.RS_NORMAL, None),
2337        (constants.RS_NORMAL, None)],
2338       [(constants.RS_NORMAL, lname),
2339        (constants.RS_NORMAL, "exclusive"),
2340        (constants.RS_NORMAL, [threading.currentThread().getName()])],
2341       [(constants.RS_NORMAL, lname),
2342        (constants.RS_NORMAL, "deleted"),
2343        (constants.RS_NORMAL, None)],
2344       [(constants.RS_NORMAL, lname),
2345        (constants.RS_NORMAL, None),
2346        (constants.RS_NORMAL, None)],
2347       [(constants.RS_NORMAL, lname),
2348        (constants.RS_NORMAL, None),
2349        (constants.RS_NORMAL, None)],
2350       ]
2351
2352     # Check information order
2353     result = self.lm.QueryLocks(["name", "mode", "owner"])
2354     self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2355
2356     self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2357     self.assertEqual(len(self.lm._locks), len(locks))
2358
2359     # Check lock deletion
2360     for idx in range(len(locks)):
2361       del locks[0]
2362       assert gc.isenabled()
2363       gc.collect()
2364       self.assertEqual(len(self.lm._locks), len(locks))
2365       result = self.lm.QueryLocks(["name", "mode", "owner"])
2366       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2367                        last_status[idx + 1:])
2368
2369     # All locks should have been deleted
2370     assert not locks
2371     self.assertFalse(self.lm._locks)
2372
2373     result = self.lm.QueryLocks(["name", "mode", "owner"])
2374     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2375
2376   class _FakeLock:
2377     def __init__(self):
2378       self._info = []
2379
2380     def AddResult(self, *args):
2381       self._info.append(args)
2382
2383     def CountPending(self):
2384       return len(self._info)
2385
2386     def GetLockInfo(self, requested):
2387       (exp_requested, result) = self._info.pop(0)
2388
2389       if exp_requested != requested:
2390         raise Exception("Requested information (%s) does not match"
2391                         " expectations (%s)" % (requested, exp_requested))
2392
2393       return result
2394
2395   def testMultipleResults(self):
2396     fl1 = self._FakeLock()
2397     fl2 = self._FakeLock()
2398
2399     self.lm.RegisterLock(fl1)
2400     self.lm.RegisterLock(fl2)
2401
2402     # Empty information
2403     for i in [fl1, fl2]:
2404       i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2405     result = self.lm.QueryLocks(["name", "mode", "owner"])
2406     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2407     for i in [fl1, fl2]:
2408       self.assertEqual(i.CountPending(), 0)
2409
2410     # Check ordering
2411     for fn in [lambda x: x, reversed, sorted]:
2412       fl1.AddResult(set(), list(fn([
2413         ("aaa", None, None, None),
2414         ("bbb", None, None, None),
2415         ])))
2416       fl2.AddResult(set(), [])
2417       result = self.lm.QueryLocks(["name"])
2418       self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2419         [(constants.RS_NORMAL, "aaa")],
2420         [(constants.RS_NORMAL, "bbb")],
2421         ])
2422       for i in [fl1, fl2]:
2423         self.assertEqual(i.CountPending(), 0)
2424
2425       for fn2 in [lambda x: x, reversed, sorted]:
2426         fl1.AddResult(set([query.LQ_MODE]), list(fn([
2427           # Same name, but different information
2428           ("aaa", "mode0", None, None),
2429           ("aaa", "mode1", None, None),
2430           ("aaa", "mode2", None, None),
2431           ("aaa", "mode3", None, None),
2432           ])))
2433         fl2.AddResult(set([query.LQ_MODE]), [
2434           ("zzz", "end", None, None),
2435           ("000", "start", None, None),
2436           ] + list(fn2([
2437           ("aaa", "b200", None, None),
2438           ("aaa", "b300", None, None),
2439           ])))
2440         result = self.lm.QueryLocks(["name", "mode"])
2441         self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2442           [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2443           ] + list(fn([
2444           # Name is the same, so order must be equal to incoming order
2445           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2446           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2447           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2448           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2449           ])) + list(fn2([
2450           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2451           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2452           ])) + [
2453           [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2454           ])
2455         for i in [fl1, fl2]:
2456           self.assertEqual(i.CountPending(), 0)
2457
2458
2459 if __name__ == "__main__":
2460   testutils.GanetiTestProgram()