Introduce a TMaybe combinator
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31 import gc
32 import itertools
33
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
41
42 import testutils
43
44
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
48
49 #: List for looping tests
50 ITERATIONS = range(8)
51
52
53 def _Repeat(fn):
54   """Decorator for executing a function many times"""
55   def wrapper(*args, **kwargs):
56     for i in ITERATIONS:
57       fn(*args, **kwargs)
58   return wrapper
59
60
61 def SafeSleep(duration):
62   start = time.time()
63   while True:
64     delay = start + duration - time.time()
65     if delay <= 0.0:
66       break
67     time.sleep(delay)
68
69
70 class _ThreadedTestCase(unittest.TestCase):
71   """Test class that supports adding/waiting on threads"""
72   def setUp(self):
73     unittest.TestCase.setUp(self)
74     self.done = Queue.Queue(0)
75     self.threads = []
76
77   def _addThread(self, *args, **kwargs):
78     """Create and remember a new thread"""
79     t = threading.Thread(*args, **kwargs)
80     self.threads.append(t)
81     t.start()
82     return t
83
84   def _waitThreads(self):
85     """Wait for all our threads to finish"""
86     for t in self.threads:
87       t.join(60)
88       self.failIf(t.isAlive())
89     self.threads = []
90
91
92 class _ConditionTestCase(_ThreadedTestCase):
93   """Common test case for conditions"""
94
95   def setUp(self, cls):
96     _ThreadedTestCase.setUp(self)
97     self.lock = threading.Lock()
98     self.cond = cls(self.lock)
99
100   def _testAcquireRelease(self):
101     self.assertFalse(self.cond._is_owned())
102     self.assertRaises(RuntimeError, self.cond.wait, None)
103     self.assertRaises(RuntimeError, self.cond.notifyAll)
104
105     self.cond.acquire()
106     self.assert_(self.cond._is_owned())
107     self.cond.notifyAll()
108     self.assert_(self.cond._is_owned())
109     self.cond.release()
110
111     self.assertFalse(self.cond._is_owned())
112     self.assertRaises(RuntimeError, self.cond.wait, None)
113     self.assertRaises(RuntimeError, self.cond.notifyAll)
114
115   def _testNotification(self):
116     def _NotifyAll():
117       self.done.put("NE")
118       self.cond.acquire()
119       self.done.put("NA")
120       self.cond.notifyAll()
121       self.done.put("NN")
122       self.cond.release()
123
124     self.cond.acquire()
125     self._addThread(target=_NotifyAll)
126     self.assertEqual(self.done.get(True, 1), "NE")
127     self.assertRaises(Queue.Empty, self.done.get_nowait)
128     self.cond.wait(None)
129     self.assertEqual(self.done.get(True, 1), "NA")
130     self.assertEqual(self.done.get(True, 1), "NN")
131     self.assert_(self.cond._is_owned())
132     self.cond.release()
133     self.assertFalse(self.cond._is_owned())
134
135
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137   """SingleNotifyPipeCondition tests"""
138
139   def setUp(self):
140     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141
142   def testAcquireRelease(self):
143     self._testAcquireRelease()
144
145   def testNotification(self):
146     self._testNotification()
147
148   def testWaitReuse(self):
149     self.cond.acquire()
150     self.cond.wait(0)
151     self.cond.wait(0.1)
152     self.cond.release()
153
154   def testNoNotifyReuse(self):
155     self.cond.acquire()
156     self.cond.notifyAll()
157     self.assertRaises(RuntimeError, self.cond.wait, None)
158     self.assertRaises(RuntimeError, self.cond.notifyAll)
159     self.cond.release()
160
161
162 class TestPipeCondition(_ConditionTestCase):
163   """PipeCondition tests"""
164
165   def setUp(self):
166     _ConditionTestCase.setUp(self, locking.PipeCondition)
167
168   def testAcquireRelease(self):
169     self._testAcquireRelease()
170
171   def testNotification(self):
172     self._testNotification()
173
174   def _TestWait(self, fn):
175     threads = [
176       self._addThread(target=fn),
177       self._addThread(target=fn),
178       self._addThread(target=fn),
179       ]
180
181     # Wait for threads to be waiting
182     for _ in threads:
183       self.assertEqual(self.done.get(True, 1), "A")
184
185     self.assertRaises(Queue.Empty, self.done.get_nowait)
186
187     self.cond.acquire()
188     self.assertEqual(len(self.cond._waiters), 3)
189     self.assertEqual(self.cond._waiters, set(threads))
190
191     self.assertTrue(repr(self.cond).startswith("<"))
192     self.assertTrue("waiters=" in repr(self.cond))
193
194     # This new thread can't acquire the lock, and thus call wait, before we
195     # release it
196     self._addThread(target=fn)
197     self.cond.notifyAll()
198     self.assertRaises(Queue.Empty, self.done.get_nowait)
199     self.cond.release()
200
201     # We should now get 3 W and 1 A (for the new thread) in whatever order
202     w = 0
203     a = 0
204     for i in range(4):
205       got = self.done.get(True, 1)
206       if got == "W":
207         w += 1
208       elif got == "A":
209         a += 1
210       else:
211         self.fail("Got %s on the done queue" % got)
212
213     self.assertEqual(w, 3)
214     self.assertEqual(a, 1)
215
216     self.cond.acquire()
217     self.cond.notifyAll()
218     self.cond.release()
219     self._waitThreads()
220     self.assertEqual(self.done.get_nowait(), "W")
221     self.assertRaises(Queue.Empty, self.done.get_nowait)
222
223   def testBlockingWait(self):
224     def _BlockingWait():
225       self.cond.acquire()
226       self.done.put("A")
227       self.cond.wait(None)
228       self.cond.release()
229       self.done.put("W")
230
231     self._TestWait(_BlockingWait)
232
233   def testLongTimeoutWait(self):
234     def _Helper():
235       self.cond.acquire()
236       self.done.put("A")
237       self.cond.wait(15.0)
238       self.cond.release()
239       self.done.put("W")
240
241     self._TestWait(_Helper)
242
243   def _TimeoutWait(self, timeout, check):
244     self.cond.acquire()
245     self.cond.wait(timeout)
246     self.cond.release()
247     self.done.put(check)
248
249   def testShortTimeoutWait(self):
250     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
251     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
252     self._waitThreads()
253     self.assertEqual(self.done.get_nowait(), "T1")
254     self.assertEqual(self.done.get_nowait(), "T1")
255     self.assertRaises(Queue.Empty, self.done.get_nowait)
256
257   def testZeroTimeoutWait(self):
258     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
259     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
260     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
261     self._waitThreads()
262     self.assertEqual(self.done.get_nowait(), "T0")
263     self.assertEqual(self.done.get_nowait(), "T0")
264     self.assertEqual(self.done.get_nowait(), "T0")
265     self.assertRaises(Queue.Empty, self.done.get_nowait)
266
267
268 class TestSharedLock(_ThreadedTestCase):
269   """SharedLock tests"""
270
271   def setUp(self):
272     _ThreadedTestCase.setUp(self)
273     self.sl = locking.SharedLock("TestSharedLock")
274
275     self.assertTrue(repr(self.sl).startswith("<"))
276     self.assertTrue("name=TestSharedLock" in repr(self.sl))
277
278   def testSequenceAndOwnership(self):
279     self.assertFalse(self.sl.is_owned())
280     self.sl.acquire(shared=1)
281     self.assert_(self.sl.is_owned())
282     self.assert_(self.sl.is_owned(shared=1))
283     self.assertFalse(self.sl.is_owned(shared=0))
284     self.sl.release()
285     self.assertFalse(self.sl.is_owned())
286     self.sl.acquire()
287     self.assert_(self.sl.is_owned())
288     self.assertFalse(self.sl.is_owned(shared=1))
289     self.assert_(self.sl.is_owned(shared=0))
290     self.sl.release()
291     self.assertFalse(self.sl.is_owned())
292     self.sl.acquire(shared=1)
293     self.assert_(self.sl.is_owned())
294     self.assert_(self.sl.is_owned(shared=1))
295     self.assertFalse(self.sl.is_owned(shared=0))
296     self.sl.release()
297     self.assertFalse(self.sl.is_owned())
298
299   def testBooleanValue(self):
300     # semaphores are supposed to return a true value on a successful acquire
301     self.assert_(self.sl.acquire(shared=1))
302     self.sl.release()
303     self.assert_(self.sl.acquire())
304     self.sl.release()
305
306   def testDoubleLockingStoE(self):
307     self.sl.acquire(shared=1)
308     self.assertRaises(AssertionError, self.sl.acquire)
309
310   def testDoubleLockingEtoS(self):
311     self.sl.acquire()
312     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
313
314   def testDoubleLockingStoS(self):
315     self.sl.acquire(shared=1)
316     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
317
318   def testDoubleLockingEtoE(self):
319     self.sl.acquire()
320     self.assertRaises(AssertionError, self.sl.acquire)
321
322   # helper functions: called in a separate thread they acquire the lock, send
323   # their identifier on the done queue, then release it.
324   def _doItSharer(self):
325     try:
326       self.sl.acquire(shared=1)
327       self.done.put("SHR")
328       self.sl.release()
329     except errors.LockError:
330       self.done.put("ERR")
331
332   def _doItExclusive(self):
333     try:
334       self.sl.acquire()
335       self.done.put("EXC")
336       self.sl.release()
337     except errors.LockError:
338       self.done.put("ERR")
339
340   def _doItDelete(self):
341     try:
342       self.sl.delete()
343       self.done.put("DEL")
344     except errors.LockError:
345       self.done.put("ERR")
346
347   def testSharersCanCoexist(self):
348     self.sl.acquire(shared=1)
349     threading.Thread(target=self._doItSharer).start()
350     self.assert_(self.done.get(True, 1))
351     self.sl.release()
352
353   @_Repeat
354   def testExclusiveBlocksExclusive(self):
355     self.sl.acquire()
356     self._addThread(target=self._doItExclusive)
357     self.assertRaises(Queue.Empty, self.done.get_nowait)
358     self.sl.release()
359     self._waitThreads()
360     self.failUnlessEqual(self.done.get_nowait(), "EXC")
361
362   @_Repeat
363   def testExclusiveBlocksDelete(self):
364     self.sl.acquire()
365     self._addThread(target=self._doItDelete)
366     self.assertRaises(Queue.Empty, self.done.get_nowait)
367     self.sl.release()
368     self._waitThreads()
369     self.failUnlessEqual(self.done.get_nowait(), "DEL")
370     self.sl = locking.SharedLock(self.sl.name)
371
372   @_Repeat
373   def testExclusiveBlocksSharer(self):
374     self.sl.acquire()
375     self._addThread(target=self._doItSharer)
376     self.assertRaises(Queue.Empty, self.done.get_nowait)
377     self.sl.release()
378     self._waitThreads()
379     self.failUnlessEqual(self.done.get_nowait(), "SHR")
380
381   @_Repeat
382   def testSharerBlocksExclusive(self):
383     self.sl.acquire(shared=1)
384     self._addThread(target=self._doItExclusive)
385     self.assertRaises(Queue.Empty, self.done.get_nowait)
386     self.sl.release()
387     self._waitThreads()
388     self.failUnlessEqual(self.done.get_nowait(), "EXC")
389
390   @_Repeat
391   def testSharerBlocksDelete(self):
392     self.sl.acquire(shared=1)
393     self._addThread(target=self._doItDelete)
394     self.assertRaises(Queue.Empty, self.done.get_nowait)
395     self.sl.release()
396     self._waitThreads()
397     self.failUnlessEqual(self.done.get_nowait(), "DEL")
398     self.sl = locking.SharedLock(self.sl.name)
399
400   @_Repeat
401   def testWaitingExclusiveBlocksSharer(self):
402     """SKIPPED testWaitingExclusiveBlockSharer"""
403     return
404
405     self.sl.acquire(shared=1)
406     # the lock is acquired in shared mode...
407     self._addThread(target=self._doItExclusive)
408     # ...but now an exclusive is waiting...
409     self._addThread(target=self._doItSharer)
410     # ...so the sharer should be blocked as well
411     self.assertRaises(Queue.Empty, self.done.get_nowait)
412     self.sl.release()
413     self._waitThreads()
414     # The exclusive passed before
415     self.failUnlessEqual(self.done.get_nowait(), "EXC")
416     self.failUnlessEqual(self.done.get_nowait(), "SHR")
417
418   @_Repeat
419   def testWaitingSharerBlocksExclusive(self):
420     """SKIPPED testWaitingSharerBlocksExclusive"""
421     return
422
423     self.sl.acquire()
424     # the lock is acquired in exclusive mode...
425     self._addThread(target=self._doItSharer)
426     # ...but now a sharer is waiting...
427     self._addThread(target=self._doItExclusive)
428     # ...the exclusive is waiting too...
429     self.assertRaises(Queue.Empty, self.done.get_nowait)
430     self.sl.release()
431     self._waitThreads()
432     # The sharer passed before
433     self.assertEqual(self.done.get_nowait(), "SHR")
434     self.assertEqual(self.done.get_nowait(), "EXC")
435
436   def testDelete(self):
437     self.sl.delete()
438     self.assertRaises(errors.LockError, self.sl.acquire)
439     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
440     self.assertRaises(errors.LockError, self.sl.delete)
441
442   def testDeleteTimeout(self):
443     self.assertTrue(self.sl.delete(timeout=60))
444
445   def testDeleteTimeoutFail(self):
446     ready = threading.Event()
447     finish = threading.Event()
448
449     def fn():
450       self.sl.acquire(shared=0)
451       ready.set()
452
453       finish.wait()
454       self.sl.release()
455
456     self._addThread(target=fn)
457     ready.wait()
458
459     # Test if deleting a lock owned in exclusive mode by another thread fails
460     # to delete when a timeout is used
461     self.assertFalse(self.sl.delete(timeout=0.02))
462
463     finish.set()
464     self._waitThreads()
465
466     self.assertTrue(self.sl.delete())
467     self.assertRaises(errors.LockError, self.sl.acquire)
468
469   def testNoDeleteIfSharer(self):
470     self.sl.acquire(shared=1)
471     self.assertRaises(AssertionError, self.sl.delete)
472
473   @_Repeat
474   def testDeletePendingSharersExclusiveDelete(self):
475     self.sl.acquire()
476     self._addThread(target=self._doItSharer)
477     self._addThread(target=self._doItSharer)
478     self._addThread(target=self._doItExclusive)
479     self._addThread(target=self._doItDelete)
480     self.sl.delete()
481     self._waitThreads()
482     # The threads who were pending return ERR
483     for _ in range(4):
484       self.assertEqual(self.done.get_nowait(), "ERR")
485     self.sl = locking.SharedLock(self.sl.name)
486
487   @_Repeat
488   def testDeletePendingDeleteExclusiveSharers(self):
489     self.sl.acquire()
490     self._addThread(target=self._doItDelete)
491     self._addThread(target=self._doItExclusive)
492     self._addThread(target=self._doItSharer)
493     self._addThread(target=self._doItSharer)
494     self.sl.delete()
495     self._waitThreads()
496     # The two threads who were pending return both ERR
497     self.assertEqual(self.done.get_nowait(), "ERR")
498     self.assertEqual(self.done.get_nowait(), "ERR")
499     self.assertEqual(self.done.get_nowait(), "ERR")
500     self.assertEqual(self.done.get_nowait(), "ERR")
501     self.sl = locking.SharedLock(self.sl.name)
502
503   @_Repeat
504   def testExclusiveAcquireTimeout(self):
505     for shared in [0, 1]:
506       on_queue = threading.Event()
507       release_exclusive = threading.Event()
508
509       def _LockExclusive():
510         self.sl.acquire(shared=0, test_notify=on_queue.set)
511         self.done.put("A: start wait")
512         release_exclusive.wait()
513         self.done.put("A: end wait")
514         self.sl.release()
515
516       # Start thread to hold lock in exclusive mode
517       self._addThread(target=_LockExclusive)
518
519       # Wait for wait to begin
520       self.assertEqual(self.done.get(timeout=60), "A: start wait")
521
522       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
523       # on the queue
524       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
525                                       test_notify=release_exclusive.set))
526
527       self.done.put("got 2nd")
528       self.sl.release()
529
530       self._waitThreads()
531
532       self.assertEqual(self.done.get_nowait(), "A: end wait")
533       self.assertEqual(self.done.get_nowait(), "got 2nd")
534       self.assertRaises(Queue.Empty, self.done.get_nowait)
535
536   @_Repeat
537   def testAcquireExpiringTimeout(self):
538     def _AcquireWithTimeout(shared, timeout):
539       if not self.sl.acquire(shared=shared, timeout=timeout):
540         self.done.put("timeout")
541
542     for shared in [0, 1]:
543       # Lock exclusively
544       self.sl.acquire()
545
546       # Start shared acquires with timeout between 0 and 20 ms
547       for i in range(11):
548         self._addThread(target=_AcquireWithTimeout,
549                         args=(shared, i * 2.0 / 1000.0))
550
551       # Wait for threads to finish (makes sure the acquire timeout expires
552       # before releasing the lock)
553       self._waitThreads()
554
555       # Release lock
556       self.sl.release()
557
558       for _ in range(11):
559         self.assertEqual(self.done.get_nowait(), "timeout")
560
561       self.assertRaises(Queue.Empty, self.done.get_nowait)
562
563   @_Repeat
564   def testSharedSkipExclusiveAcquires(self):
565     # Tests whether shared acquires jump in front of exclusive acquires in the
566     # queue.
567
568     def _Acquire(shared, name, notify_ev, wait_ev):
569       if notify_ev:
570         notify_fn = notify_ev.set
571       else:
572         notify_fn = None
573
574       if wait_ev:
575         wait_ev.wait()
576
577       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
578         return
579
580       self.done.put(name)
581       self.sl.release()
582
583     # Get exclusive lock while we fill the queue
584     self.sl.acquire()
585
586     shrcnt1 = 5
587     shrcnt2 = 7
588     shrcnt3 = 9
589     shrcnt4 = 2
590
591     # Add acquires using threading.Event for synchronization. They'll be
592     # acquired exactly in the order defined in this list.
593     acquires = (shrcnt1 * [(1, "shared 1")] +
594                 3 * [(0, "exclusive 1")] +
595                 shrcnt2 * [(1, "shared 2")] +
596                 shrcnt3 * [(1, "shared 3")] +
597                 shrcnt4 * [(1, "shared 4")] +
598                 3 * [(0, "exclusive 2")])
599
600     ev_cur = None
601     ev_prev = None
602
603     for args in acquires:
604       ev_cur = threading.Event()
605       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
606       ev_prev = ev_cur
607
608     # Wait for last acquire to start
609     ev_prev.wait()
610
611     # Expect 6 pending exclusive acquires and 1 for all shared acquires
612     # together
613     self.assertEqual(self.sl._count_pending(), 7)
614
615     # Release exclusive lock and wait
616     self.sl.release()
617
618     self._waitThreads()
619
620     # Check sequence
621     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
622       # Shared locks aren't guaranteed to be notified in order, but they'll be
623       # first
624       tmp = self.done.get_nowait()
625       if tmp == "shared 1":
626         shrcnt1 -= 1
627       elif tmp == "shared 2":
628         shrcnt2 -= 1
629       elif tmp == "shared 3":
630         shrcnt3 -= 1
631       elif tmp == "shared 4":
632         shrcnt4 -= 1
633     self.assertEqual(shrcnt1, 0)
634     self.assertEqual(shrcnt2, 0)
635     self.assertEqual(shrcnt3, 0)
636     self.assertEqual(shrcnt3, 0)
637
638     for _ in range(3):
639       self.assertEqual(self.done.get_nowait(), "exclusive 1")
640
641     for _ in range(3):
642       self.assertEqual(self.done.get_nowait(), "exclusive 2")
643
644     self.assertRaises(Queue.Empty, self.done.get_nowait)
645
646   def testIllegalDowngrade(self):
647     # Not yet acquired
648     self.assertRaises(AssertionError, self.sl.downgrade)
649
650     # Acquire in shared mode, downgrade should be no-op
651     self.assertTrue(self.sl.acquire(shared=1))
652     self.assertTrue(self.sl.is_owned(shared=1))
653     self.assertTrue(self.sl.downgrade())
654     self.assertTrue(self.sl.is_owned(shared=1))
655     self.sl.release()
656
657   def testDowngrade(self):
658     self.assertTrue(self.sl.acquire())
659     self.assertTrue(self.sl.is_owned(shared=0))
660     self.assertTrue(self.sl.downgrade())
661     self.assertTrue(self.sl.is_owned(shared=1))
662     self.sl.release()
663
664   @_Repeat
665   def testDowngradeJumpsAheadOfExclusive(self):
666     def _KeepExclusive(ev_got, ev_downgrade, ev_release):
667       self.assertTrue(self.sl.acquire())
668       self.assertTrue(self.sl.is_owned(shared=0))
669       ev_got.set()
670       ev_downgrade.wait()
671       self.assertTrue(self.sl.is_owned(shared=0))
672       self.assertTrue(self.sl.downgrade())
673       self.assertTrue(self.sl.is_owned(shared=1))
674       ev_release.wait()
675       self.assertTrue(self.sl.is_owned(shared=1))
676       self.sl.release()
677
678     def _KeepExclusive2(ev_started, ev_release):
679       self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
680       self.assertTrue(self.sl.is_owned(shared=0))
681       ev_release.wait()
682       self.assertTrue(self.sl.is_owned(shared=0))
683       self.sl.release()
684
685     def _KeepShared(ev_started, ev_got, ev_release):
686       self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
687       self.assertTrue(self.sl.is_owned(shared=1))
688       ev_got.set()
689       ev_release.wait()
690       self.assertTrue(self.sl.is_owned(shared=1))
691       self.sl.release()
692
693     # Acquire lock in exclusive mode
694     ev_got_excl1 = threading.Event()
695     ev_downgrade_excl1 = threading.Event()
696     ev_release_excl1 = threading.Event()
697     th_excl1 = self._addThread(target=_KeepExclusive,
698                                args=(ev_got_excl1, ev_downgrade_excl1,
699                                      ev_release_excl1))
700     ev_got_excl1.wait()
701
702     # Start a second exclusive acquire
703     ev_started_excl2 = threading.Event()
704     ev_release_excl2 = threading.Event()
705     th_excl2 = self._addThread(target=_KeepExclusive2,
706                                args=(ev_started_excl2, ev_release_excl2))
707     ev_started_excl2.wait()
708
709     # Start shared acquires, will jump ahead of second exclusive acquire when
710     # first exclusive acquire downgrades
711     ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
712     ev_release_shared = threading.Event()
713
714     th_shared = [self._addThread(target=_KeepShared,
715                                  args=(ev_started, ev_got, ev_release_shared))
716                  for (ev_started, ev_got) in ev_shared]
717
718     # Wait for all shared acquires to start
719     for (ev, _) in ev_shared:
720       ev.wait()
721
722     # Check lock information
723     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
724                      [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
725     [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
726     self.assertEqual([(pendmode, sorted(waiting))
727                       for (pendmode, waiting) in pending],
728                      [("exclusive", [th_excl2.getName()]),
729                       ("shared", sorted(th.getName() for th in th_shared))])
730
731     # Shared acquires won't start until the exclusive lock is downgraded
732     ev_downgrade_excl1.set()
733
734     # Wait for all shared acquires to be successful
735     for (_, ev) in ev_shared:
736       ev.wait()
737
738     # Check lock information again
739     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
740                                               query.LQ_PENDING])),
741                      [(self.sl.name, "shared", None,
742                        [("exclusive", [th_excl2.getName()])])])
743     [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
744     self.assertEqual(set(owner), set([th_excl1.getName()] +
745                                      [th.getName() for th in th_shared]))
746
747     ev_release_excl1.set()
748     ev_release_excl2.set()
749     ev_release_shared.set()
750
751     self._waitThreads()
752
753     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
754                                               query.LQ_PENDING])),
755                      [(self.sl.name, None, None, [])])
756
757   @_Repeat
758   def testMixedAcquireTimeout(self):
759     sync = threading.Event()
760
761     def _AcquireShared(ev):
762       if not self.sl.acquire(shared=1, timeout=None):
763         return
764
765       self.done.put("shared")
766
767       # Notify main thread
768       ev.set()
769
770       # Wait for notification from main thread
771       sync.wait()
772
773       # Release lock
774       self.sl.release()
775
776     acquires = []
777     for _ in range(3):
778       ev = threading.Event()
779       self._addThread(target=_AcquireShared, args=(ev, ))
780       acquires.append(ev)
781
782     # Wait for all acquires to finish
783     for i in acquires:
784       i.wait()
785
786     self.assertEqual(self.sl._count_pending(), 0)
787
788     # Try to get exclusive lock
789     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
790
791     # Acquire exclusive without timeout
792     exclsync = threading.Event()
793     exclev = threading.Event()
794
795     def _AcquireExclusive():
796       if not self.sl.acquire(shared=0):
797         return
798
799       self.done.put("exclusive")
800
801       # Notify main thread
802       exclev.set()
803
804       # Wait for notification from main thread
805       exclsync.wait()
806
807       self.sl.release()
808
809     self._addThread(target=_AcquireExclusive)
810
811     # Try to get exclusive lock
812     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
813
814     # Make all shared holders release their locks
815     sync.set()
816
817     # Wait for exclusive acquire to succeed
818     exclev.wait()
819
820     self.assertEqual(self.sl._count_pending(), 0)
821
822     # Try to get exclusive lock
823     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
824
825     def _AcquireSharedSimple():
826       if self.sl.acquire(shared=1, timeout=None):
827         self.done.put("shared2")
828         self.sl.release()
829
830     for _ in range(10):
831       self._addThread(target=_AcquireSharedSimple)
832
833     # Tell exclusive lock to release
834     exclsync.set()
835
836     # Wait for everything to finish
837     self._waitThreads()
838
839     self.assertEqual(self.sl._count_pending(), 0)
840
841     # Check sequence
842     for _ in range(3):
843       self.assertEqual(self.done.get_nowait(), "shared")
844
845     self.assertEqual(self.done.get_nowait(), "exclusive")
846
847     for _ in range(10):
848       self.assertEqual(self.done.get_nowait(), "shared2")
849
850     self.assertRaises(Queue.Empty, self.done.get_nowait)
851
852   def testPriority(self):
853     # Acquire in exclusive mode
854     self.assert_(self.sl.acquire(shared=0))
855
856     # Queue acquires
857     def _Acquire(prev, next, shared, priority, result):
858       prev.wait()
859       self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
860       try:
861         self.done.put(result)
862       finally:
863         self.sl.release()
864
865     counter = itertools.count(0)
866     priorities = range(-20, 30)
867     first = threading.Event()
868     prev = first
869
870     # Data structure:
871     # {
872     #   priority:
873     #     [(shared/exclusive, set(acquire names), set(pending threads)),
874     #      (shared/exclusive, ...),
875     #      ...,
876     #     ],
877     # }
878     perprio = {}
879
880     # References shared acquire per priority in L{perprio}. Data structure:
881     # {
882     #   priority: (shared=1, set(acquire names), set(pending threads)),
883     # }
884     prioshared = {}
885
886     for seed in [4979, 9523, 14902, 32440]:
887       # Use a deterministic random generator
888       rnd = random.Random(seed)
889       for priority in [rnd.choice(priorities) for _ in range(30)]:
890         modes = [0, 1]
891         rnd.shuffle(modes)
892         for shared in modes:
893           # Unique name
894           acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
895
896           ev = threading.Event()
897           thread = self._addThread(target=_Acquire,
898                                    args=(prev, ev, shared, priority, acqname))
899           prev = ev
900
901           # Record expected aqcuire, see above for structure
902           data = (shared, set([acqname]), set([thread]))
903           priolist = perprio.setdefault(priority, [])
904           if shared:
905             priosh = prioshared.get(priority, None)
906             if priosh:
907               # Shared acquires are merged
908               for i, j in zip(priosh[1:], data[1:]):
909                 i.update(j)
910               assert data[0] == priosh[0]
911             else:
912               prioshared[priority] = data
913               priolist.append(data)
914           else:
915             priolist.append(data)
916
917     # Start all acquires and wait for them
918     first.set()
919     prev.wait()
920
921     # Check lock information
922     self.assertEqual(self.sl.GetLockInfo(set()),
923                      [(self.sl.name, None, None, None)])
924     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
925                      [(self.sl.name, "exclusive",
926                        [threading.currentThread().getName()], None)])
927
928     self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
929                             perprio)
930
931     # Let threads acquire the lock
932     self.sl.release()
933
934     # Wait for everything to finish
935     self._waitThreads()
936
937     self.assert_(self.sl._check_empty())
938
939     # Check acquires by priority
940     for acquires in [perprio[i] for i in sorted(perprio.keys())]:
941       for (_, names, _) in acquires:
942         # For shared acquires, the set will contain 1..n entries. For exclusive
943         # acquires only one.
944         while names:
945           names.remove(self.done.get_nowait())
946       self.assertFalse(compat.any(names for (_, names, _) in acquires))
947
948     self.assertRaises(Queue.Empty, self.done.get_nowait)
949
950   def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
951     self.assertEqual(name, self.sl.name)
952     self.assert_(mode is None)
953     self.assert_(owner is None)
954
955     self.assertEqual([(pendmode, sorted(waiting))
956                       for (pendmode, waiting) in pending],
957                      [(["exclusive", "shared"][int(bool(shared))],
958                        sorted(t.getName() for t in threads))
959                       for acquires in [perprio[i]
960                                        for i in sorted(perprio.keys())]
961                       for (shared, _, threads) in acquires])
962
963   class _FakeTimeForSpuriousNotifications:
964     def __init__(self, now, check_end):
965       self.now = now
966       self.check_end = check_end
967
968       # Deterministic random number generator
969       self.rnd = random.Random(15086)
970
971     def time(self):
972       # Advance time if the random number generator thinks so (this is to test
973       # multiple notifications without advancing the time)
974       if self.rnd.random() < 0.3:
975         self.now += self.rnd.random()
976
977       self.check_end(self.now)
978
979       return self.now
980
981   @_Repeat
982   def testAcquireTimeoutWithSpuriousNotifications(self):
983     ready = threading.Event()
984     locked = threading.Event()
985     req = Queue.Queue(0)
986
987     epoch = 4000.0
988     timeout = 60.0
989
990     def check_end(now):
991       self.assertFalse(locked.isSet())
992
993       # If we waited long enough (in virtual time), tell main thread to release
994       # lock, otherwise tell it to notify once more
995       req.put(now < (epoch + (timeout * 0.8)))
996
997     time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
998
999     sl = locking.SharedLock("test", _time_fn=time_fn)
1000
1001     # Acquire in exclusive mode
1002     sl.acquire(shared=0)
1003
1004     def fn():
1005       self.assertTrue(sl.acquire(shared=0, timeout=timeout,
1006                                  test_notify=ready.set))
1007       locked.set()
1008       sl.release()
1009       self.done.put("success")
1010
1011     # Start acquire with timeout and wait for it to be ready
1012     self._addThread(target=fn)
1013     ready.wait()
1014
1015     # The separate thread is now waiting to acquire the lock, so start sending
1016     # spurious notifications.
1017
1018     # Wait for separate thread to ask for another notification
1019     count = 0
1020     while req.get():
1021       # After sending the notification, the lock will take a short amount of
1022       # time to notice and to retrieve the current time
1023       sl._notify_topmost()
1024       count += 1
1025
1026     self.assertTrue(count > 100, "Not enough notifications were sent")
1027
1028     self.assertFalse(locked.isSet())
1029
1030     # Some notifications have been sent, now actually release the lock
1031     sl.release()
1032
1033     # Wait for lock to be acquired
1034     locked.wait()
1035
1036     self._waitThreads()
1037
1038     self.assertEqual(self.done.get_nowait(), "success")
1039     self.assertRaises(Queue.Empty, self.done.get_nowait)
1040
1041
1042 class TestSharedLockInCondition(_ThreadedTestCase):
1043   """SharedLock as a condition lock tests"""
1044
1045   def setUp(self):
1046     _ThreadedTestCase.setUp(self)
1047     self.sl = locking.SharedLock("TestSharedLockInCondition")
1048     self.setCondition()
1049
1050   def setCondition(self):
1051     self.cond = threading.Condition(self.sl)
1052
1053   def testKeepMode(self):
1054     self.cond.acquire(shared=1)
1055     self.assert_(self.sl.is_owned(shared=1))
1056     self.cond.wait(0)
1057     self.assert_(self.sl.is_owned(shared=1))
1058     self.cond.release()
1059     self.cond.acquire(shared=0)
1060     self.assert_(self.sl.is_owned(shared=0))
1061     self.cond.wait(0)
1062     self.assert_(self.sl.is_owned(shared=0))
1063     self.cond.release()
1064
1065
1066 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
1067   """SharedLock as a pipe condition lock tests"""
1068
1069   def setCondition(self):
1070     self.cond = locking.PipeCondition(self.sl)
1071
1072
1073 class TestSSynchronizedDecorator(_ThreadedTestCase):
1074   """Shared Lock Synchronized decorator test"""
1075
1076   def setUp(self):
1077     _ThreadedTestCase.setUp(self)
1078
1079   @locking.ssynchronized(_decoratorlock)
1080   def _doItExclusive(self):
1081     self.assert_(_decoratorlock.is_owned())
1082     self.done.put("EXC")
1083
1084   @locking.ssynchronized(_decoratorlock, shared=1)
1085   def _doItSharer(self):
1086     self.assert_(_decoratorlock.is_owned(shared=1))
1087     self.done.put("SHR")
1088
1089   def testDecoratedFunctions(self):
1090     self._doItExclusive()
1091     self.assertFalse(_decoratorlock.is_owned())
1092     self._doItSharer()
1093     self.assertFalse(_decoratorlock.is_owned())
1094
1095   def testSharersCanCoexist(self):
1096     _decoratorlock.acquire(shared=1)
1097     threading.Thread(target=self._doItSharer).start()
1098     self.assert_(self.done.get(True, 1))
1099     _decoratorlock.release()
1100
1101   @_Repeat
1102   def testExclusiveBlocksExclusive(self):
1103     _decoratorlock.acquire()
1104     self._addThread(target=self._doItExclusive)
1105     # give it a bit of time to check that it's not actually doing anything
1106     self.assertRaises(Queue.Empty, self.done.get_nowait)
1107     _decoratorlock.release()
1108     self._waitThreads()
1109     self.failUnlessEqual(self.done.get_nowait(), "EXC")
1110
1111   @_Repeat
1112   def testExclusiveBlocksSharer(self):
1113     _decoratorlock.acquire()
1114     self._addThread(target=self._doItSharer)
1115     self.assertRaises(Queue.Empty, self.done.get_nowait)
1116     _decoratorlock.release()
1117     self._waitThreads()
1118     self.failUnlessEqual(self.done.get_nowait(), "SHR")
1119
1120   @_Repeat
1121   def testSharerBlocksExclusive(self):
1122     _decoratorlock.acquire(shared=1)
1123     self._addThread(target=self._doItExclusive)
1124     self.assertRaises(Queue.Empty, self.done.get_nowait)
1125     _decoratorlock.release()
1126     self._waitThreads()
1127     self.failUnlessEqual(self.done.get_nowait(), "EXC")
1128
1129
1130 class TestLockSet(_ThreadedTestCase):
1131   """LockSet tests"""
1132
1133   def setUp(self):
1134     _ThreadedTestCase.setUp(self)
1135     self._setUpLS()
1136
1137   def _setUpLS(self):
1138     """Helper to (re)initialize the lock set"""
1139     self.resources = ["one", "two", "three"]
1140     self.ls = locking.LockSet(self.resources, "TestLockSet")
1141
1142   def testResources(self):
1143     self.assertEquals(self.ls._names(), set(self.resources))
1144     newls = locking.LockSet([], "TestLockSet.testResources")
1145     self.assertEquals(newls._names(), set())
1146
1147   def testCheckOwnedUnknown(self):
1148     self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1149     for shared in [-1, 0, 1, 6378, 24255]:
1150       self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1151                                            shared=shared))
1152
1153   def testCheckOwnedUnknownWhileHolding(self):
1154     self.assertFalse(self.ls.check_owned([]))
1155     self.ls.acquire("one", shared=1)
1156     self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1157     self.assertTrue(self.ls.check_owned("one", shared=1))
1158     self.assertFalse(self.ls.check_owned("one", shared=0))
1159     self.assertFalse(self.ls.check_owned(["one", "two"]))
1160     self.assertRaises(errors.LockError, self.ls.check_owned,
1161                       ["one", "nonexist"])
1162     self.assertRaises(errors.LockError, self.ls.check_owned, "")
1163     self.ls.release()
1164     self.assertFalse(self.ls.check_owned([]))
1165     self.assertFalse(self.ls.check_owned("one"))
1166
1167   def testAcquireRelease(self):
1168     self.assertFalse(self.ls.check_owned(self.ls._names()))
1169     self.assert_(self.ls.acquire("one"))
1170     self.assertEquals(self.ls.list_owned(), set(["one"]))
1171     self.assertTrue(self.ls.check_owned("one"))
1172     self.assertTrue(self.ls.check_owned("one", shared=0))
1173     self.assertFalse(self.ls.check_owned("one", shared=1))
1174     self.ls.release()
1175     self.assertEquals(self.ls.list_owned(), set())
1176     self.assertFalse(self.ls.check_owned(self.ls._names()))
1177     self.assertEquals(self.ls.acquire(["one"]), set(["one"]))
1178     self.assertEquals(self.ls.list_owned(), set(["one"]))
1179     self.ls.release()
1180     self.assertEquals(self.ls.list_owned(), set())
1181     self.ls.acquire(["one", "two", "three"])
1182     self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1183     self.assertTrue(self.ls.check_owned(self.ls._names()))
1184     self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1185     self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1186     self.ls.release("one")
1187     self.assertFalse(self.ls.check_owned(["one"]))
1188     self.assertTrue(self.ls.check_owned(["two", "three"]))
1189     self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1190     self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1191     self.assertEquals(self.ls.list_owned(), set(["two", "three"]))
1192     self.ls.release(["three"])
1193     self.assertEquals(self.ls.list_owned(), set(["two"]))
1194     self.ls.release()
1195     self.assertEquals(self.ls.list_owned(), set())
1196     self.assertEquals(self.ls.acquire(["one", "three"]), set(["one", "three"]))
1197     self.assertEquals(self.ls.list_owned(), set(["one", "three"]))
1198     self.ls.release()
1199     self.assertEquals(self.ls.list_owned(), set())
1200     for name in self.ls._names():
1201       self.assertFalse(self.ls.check_owned(name))
1202
1203   def testNoDoubleAcquire(self):
1204     self.ls.acquire("one")
1205     self.assertRaises(AssertionError, self.ls.acquire, "one")
1206     self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1207     self.assertRaises(AssertionError, self.ls.acquire, ["two", "three"])
1208     self.ls.release()
1209     self.ls.acquire(["one", "three"])
1210     self.ls.release("one")
1211     self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1212     self.ls.release("three")
1213
1214   def testNoWrongRelease(self):
1215     self.assertRaises(AssertionError, self.ls.release)
1216     self.ls.acquire("one")
1217     self.assertRaises(AssertionError, self.ls.release, "two")
1218
1219   def testAddRemove(self):
1220     self.ls.add("four")
1221     self.assertEquals(self.ls.list_owned(), set())
1222     self.assert_("four" in self.ls._names())
1223     self.ls.add(["five", "six", "seven"], acquired=1)
1224     self.assert_("five" in self.ls._names())
1225     self.assert_("six" in self.ls._names())
1226     self.assert_("seven" in self.ls._names())
1227     self.assertEquals(self.ls.list_owned(), set(["five", "six", "seven"]))
1228     self.assertEquals(self.ls.remove(["five", "six"]), ["five", "six"])
1229     self.assert_("five" not in self.ls._names())
1230     self.assert_("six" not in self.ls._names())
1231     self.assertEquals(self.ls.list_owned(), set(["seven"]))
1232     self.assertRaises(AssertionError, self.ls.add, "eight", acquired=1)
1233     self.ls.remove("seven")
1234     self.assert_("seven" not in self.ls._names())
1235     self.assertEquals(self.ls.list_owned(), set([]))
1236     self.ls.acquire(None, shared=1)
1237     self.assertRaises(AssertionError, self.ls.add, "eight")
1238     self.ls.release()
1239     self.ls.acquire(None)
1240     self.ls.add("eight", acquired=1)
1241     self.assert_("eight" in self.ls._names())
1242     self.assert_("eight" in self.ls.list_owned())
1243     self.ls.add("nine")
1244     self.assert_("nine" in self.ls._names())
1245     self.assert_("nine" not in self.ls.list_owned())
1246     self.ls.release()
1247     self.ls.remove(["two"])
1248     self.assert_("two" not in self.ls._names())
1249     self.ls.acquire("three")
1250     self.assertEquals(self.ls.remove(["three"]), ["three"])
1251     self.assert_("three" not in self.ls._names())
1252     self.assertEquals(self.ls.remove("three"), [])
1253     self.assertEquals(self.ls.remove(["one", "three", "six"]), ["one"])
1254     self.assert_("one" not in self.ls._names())
1255
1256   def testRemoveNonBlocking(self):
1257     self.ls.acquire("one")
1258     self.assertEquals(self.ls.remove("one"), ["one"])
1259     self.ls.acquire(["two", "three"])
1260     self.assertEquals(self.ls.remove(["two", "three"]),
1261                       ["two", "three"])
1262
1263   def testNoDoubleAdd(self):
1264     self.assertRaises(errors.LockError, self.ls.add, "two")
1265     self.ls.add("four")
1266     self.assertRaises(errors.LockError, self.ls.add, "four")
1267
1268   def testNoWrongRemoves(self):
1269     self.ls.acquire(["one", "three"], shared=1)
1270     # Cannot remove "two" while holding something which is not a superset
1271     self.assertRaises(AssertionError, self.ls.remove, "two")
1272     # Cannot remove "three" as we are sharing it
1273     self.assertRaises(AssertionError, self.ls.remove, "three")
1274
1275   def testAcquireSetLock(self):
1276     # acquire the set-lock exclusively
1277     self.assertEquals(self.ls.acquire(None), set(["one", "two", "three"]))
1278     self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1279     self.assertEquals(self.ls.is_owned(), True)
1280     self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1281     # I can still add/remove elements...
1282     self.assertEquals(self.ls.remove(["two", "three"]), ["two", "three"])
1283     self.assert_(self.ls.add("six"))
1284     self.ls.release()
1285     # share the set-lock
1286     self.assertEquals(self.ls.acquire(None, shared=1), set(["one", "six"]))
1287     # adding new elements is not possible
1288     self.assertRaises(AssertionError, self.ls.add, "five")
1289     self.ls.release()
1290
1291   def testAcquireWithRepetitions(self):
1292     self.assertEquals(self.ls.acquire(["two", "two", "three"], shared=1),
1293                       set(["two", "two", "three"]))
1294     self.ls.release(["two", "two"])
1295     self.assertEquals(self.ls.list_owned(), set(["three"]))
1296
1297   def testEmptyAcquire(self):
1298     # Acquire an empty list of locks...
1299     self.assertEquals(self.ls.acquire([]), set())
1300     self.assertEquals(self.ls.list_owned(), set())
1301     # New locks can still be addded
1302     self.assert_(self.ls.add("six"))
1303     # "re-acquiring" is not an issue, since we had really acquired nothing
1304     self.assertEquals(self.ls.acquire([], shared=1), set())
1305     self.assertEquals(self.ls.list_owned(), set())
1306     # We haven't really acquired anything, so we cannot release
1307     self.assertRaises(AssertionError, self.ls.release)
1308
1309   def _doLockSet(self, names, shared):
1310     try:
1311       self.ls.acquire(names, shared=shared)
1312       self.done.put("DONE")
1313       self.ls.release()
1314     except errors.LockError:
1315       self.done.put("ERR")
1316
1317   def _doAddSet(self, names):
1318     try:
1319       self.ls.add(names, acquired=1)
1320       self.done.put("DONE")
1321       self.ls.release()
1322     except errors.LockError:
1323       self.done.put("ERR")
1324
1325   def _doRemoveSet(self, names):
1326     self.done.put(self.ls.remove(names))
1327
1328   @_Repeat
1329   def testConcurrentSharedAcquire(self):
1330     self.ls.acquire(["one", "two"], shared=1)
1331     self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1332     self._waitThreads()
1333     self.assertEqual(self.done.get_nowait(), "DONE")
1334     self._addThread(target=self._doLockSet, args=(["one", "two", "three"], 1))
1335     self._waitThreads()
1336     self.assertEqual(self.done.get_nowait(), "DONE")
1337     self._addThread(target=self._doLockSet, args=("three", 1))
1338     self._waitThreads()
1339     self.assertEqual(self.done.get_nowait(), "DONE")
1340     self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1341     self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1342     self.assertRaises(Queue.Empty, self.done.get_nowait)
1343     self.ls.release()
1344     self._waitThreads()
1345     self.assertEqual(self.done.get_nowait(), "DONE")
1346     self.assertEqual(self.done.get_nowait(), "DONE")
1347
1348   @_Repeat
1349   def testConcurrentExclusiveAcquire(self):
1350     self.ls.acquire(["one", "two"])
1351     self._addThread(target=self._doLockSet, args=("three", 1))
1352     self._waitThreads()
1353     self.assertEqual(self.done.get_nowait(), "DONE")
1354     self._addThread(target=self._doLockSet, args=("three", 0))
1355     self._waitThreads()
1356     self.assertEqual(self.done.get_nowait(), "DONE")
1357     self.assertRaises(Queue.Empty, self.done.get_nowait)
1358     self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1359     self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1360     self._addThread(target=self._doLockSet, args=("one", 0))
1361     self._addThread(target=self._doLockSet, args=("one", 1))
1362     self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1363     self._addThread(target=self._doLockSet, args=(["two", "three"], 1))
1364     self.assertRaises(Queue.Empty, self.done.get_nowait)
1365     self.ls.release()
1366     self._waitThreads()
1367     for _ in range(6):
1368       self.failUnlessEqual(self.done.get_nowait(), "DONE")
1369
1370   @_Repeat
1371   def testSimpleAcquireTimeoutExpiring(self):
1372     names = sorted(self.ls._names())
1373     self.assert_(len(names) >= 3)
1374
1375     # Get name of first lock
1376     first = names[0]
1377
1378     # Get name of last lock
1379     last = names.pop()
1380
1381     checks = [
1382       # Block first and try to lock it again
1383       (first, first),
1384
1385       # Block last and try to lock all locks
1386       (None, first),
1387
1388       # Block last and try to lock it again
1389       (last, last),
1390       ]
1391
1392     for (wanted, block) in checks:
1393       # Lock in exclusive mode
1394       self.assert_(self.ls.acquire(block, shared=0))
1395
1396       def _AcquireOne():
1397         # Try to get the same lock again with a timeout (should never succeed)
1398         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1399         if acquired:
1400           self.done.put("acquired")
1401           self.ls.release()
1402         else:
1403           self.assert_(acquired is None)
1404           self.assertFalse(self.ls.list_owned())
1405           self.assertFalse(self.ls.is_owned())
1406           self.done.put("not acquired")
1407
1408       self._addThread(target=_AcquireOne)
1409
1410       # Wait for timeout in thread to expire
1411       self._waitThreads()
1412
1413       # Release exclusive lock again
1414       self.ls.release()
1415
1416       self.assertEqual(self.done.get_nowait(), "not acquired")
1417       self.assertRaises(Queue.Empty, self.done.get_nowait)
1418
1419   @_Repeat
1420   def testDelayedAndExpiringLockAcquire(self):
1421     self._setUpLS()
1422     self.ls.add(["five", "six", "seven", "eight", "nine"])
1423
1424     for expire in (False, True):
1425       names = sorted(self.ls._names())
1426       self.assertEqual(len(names), 8)
1427
1428       lock_ev = dict([(i, threading.Event()) for i in names])
1429
1430       # Lock all in exclusive mode
1431       self.assert_(self.ls.acquire(names, shared=0))
1432
1433       if expire:
1434         # We'll wait at least 300ms per lock
1435         lockwait = len(names) * [0.3]
1436
1437         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1438         # this gives us up to 2.4s to fail.
1439         lockall_timeout = 0.4
1440       else:
1441         # This should finish rather quickly
1442         lockwait = None
1443         lockall_timeout = len(names) * 5.0
1444
1445       def _LockAll():
1446         def acquire_notification(name):
1447           if not expire:
1448             self.done.put("getting %s" % name)
1449
1450           # Kick next lock
1451           lock_ev[name].set()
1452
1453         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1454                            test_notify=acquire_notification):
1455           self.done.put("got all")
1456           self.ls.release()
1457         else:
1458           self.done.put("timeout on all")
1459
1460         # Notify all locks
1461         for ev in lock_ev.values():
1462           ev.set()
1463
1464       t = self._addThread(target=_LockAll)
1465
1466       for idx, name in enumerate(names):
1467         # Wait for actual acquire on this lock to start
1468         lock_ev[name].wait(10.0)
1469
1470         if expire and t.isAlive():
1471           # Wait some time after getting the notification to make sure the lock
1472           # acquire will expire
1473           SafeSleep(lockwait[idx])
1474
1475         self.ls.release(names=name)
1476
1477       self.assertFalse(self.ls.list_owned())
1478
1479       self._waitThreads()
1480
1481       if expire:
1482         # Not checking which locks were actually acquired. Doing so would be
1483         # too timing-dependant.
1484         self.assertEqual(self.done.get_nowait(), "timeout on all")
1485       else:
1486         for i in names:
1487           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1488         self.assertEqual(self.done.get_nowait(), "got all")
1489       self.assertRaises(Queue.Empty, self.done.get_nowait)
1490
1491   @_Repeat
1492   def testConcurrentRemove(self):
1493     self.ls.add("four")
1494     self.ls.acquire(["one", "two", "four"])
1495     self._addThread(target=self._doLockSet, args=(["one", "four"], 0))
1496     self._addThread(target=self._doLockSet, args=(["one", "four"], 1))
1497     self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1498     self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1499     self.assertRaises(Queue.Empty, self.done.get_nowait)
1500     self.ls.remove("one")
1501     self.ls.release()
1502     self._waitThreads()
1503     for i in range(4):
1504       self.failUnlessEqual(self.done.get_nowait(), "ERR")
1505     self.ls.add(["five", "six"], acquired=1)
1506     self._addThread(target=self._doLockSet, args=(["three", "six"], 1))
1507     self._addThread(target=self._doLockSet, args=(["three", "six"], 0))
1508     self._addThread(target=self._doLockSet, args=(["four", "six"], 1))
1509     self._addThread(target=self._doLockSet, args=(["four", "six"], 0))
1510     self.ls.remove("five")
1511     self.ls.release()
1512     self._waitThreads()
1513     for i in range(4):
1514       self.failUnlessEqual(self.done.get_nowait(), "DONE")
1515     self.ls.acquire(["three", "four"])
1516     self._addThread(target=self._doRemoveSet, args=(["four", "six"], ))
1517     self.assertRaises(Queue.Empty, self.done.get_nowait)
1518     self.ls.remove("four")
1519     self._waitThreads()
1520     self.assertEqual(self.done.get_nowait(), ["six"])
1521     self._addThread(target=self._doRemoveSet, args=(["two"]))
1522     self._waitThreads()
1523     self.assertEqual(self.done.get_nowait(), ["two"])
1524     self.ls.release()
1525     # reset lockset
1526     self._setUpLS()
1527
1528   @_Repeat
1529   def testConcurrentSharedSetLock(self):
1530     # share the set-lock...
1531     self.ls.acquire(None, shared=1)
1532     # ...another thread can share it too
1533     self._addThread(target=self._doLockSet, args=(None, 1))
1534     self._waitThreads()
1535     self.assertEqual(self.done.get_nowait(), "DONE")
1536     # ...or just share some elements
1537     self._addThread(target=self._doLockSet, args=(["one", "three"], 1))
1538     self._waitThreads()
1539     self.assertEqual(self.done.get_nowait(), "DONE")
1540     # ...but not add new ones or remove any
1541     t = self._addThread(target=self._doAddSet, args=(["nine"]))
1542     self._addThread(target=self._doRemoveSet, args=(["two"], ))
1543     self.assertRaises(Queue.Empty, self.done.get_nowait)
1544     # this just releases the set-lock
1545     self.ls.release([])
1546     t.join(60)
1547     self.assertEqual(self.done.get_nowait(), "DONE")
1548     # release the lock on the actual elements so remove() can proceed too
1549     self.ls.release()
1550     self._waitThreads()
1551     self.failUnlessEqual(self.done.get_nowait(), ["two"])
1552     # reset lockset
1553     self._setUpLS()
1554
1555   @_Repeat
1556   def testConcurrentExclusiveSetLock(self):
1557     # acquire the set-lock...
1558     self.ls.acquire(None, shared=0)
1559     # ...no one can do anything else
1560     self._addThread(target=self._doLockSet, args=(None, 1))
1561     self._addThread(target=self._doLockSet, args=(None, 0))
1562     self._addThread(target=self._doLockSet, args=(["three"], 0))
1563     self._addThread(target=self._doLockSet, args=(["two"], 1))
1564     self._addThread(target=self._doAddSet, args=(["nine"]))
1565     self.assertRaises(Queue.Empty, self.done.get_nowait)
1566     self.ls.release()
1567     self._waitThreads()
1568     for _ in range(5):
1569       self.assertEqual(self.done.get(True, 1), "DONE")
1570     # cleanup
1571     self._setUpLS()
1572
1573   @_Repeat
1574   def testConcurrentSetLockAdd(self):
1575     self.ls.acquire("one")
1576     # Another thread wants the whole SetLock
1577     self._addThread(target=self._doLockSet, args=(None, 0))
1578     self._addThread(target=self._doLockSet, args=(None, 1))
1579     self.assertRaises(Queue.Empty, self.done.get_nowait)
1580     self.assertRaises(AssertionError, self.ls.add, "four")
1581     self.ls.release()
1582     self._waitThreads()
1583     self.assertEqual(self.done.get_nowait(), "DONE")
1584     self.assertEqual(self.done.get_nowait(), "DONE")
1585     self.ls.acquire(None)
1586     self._addThread(target=self._doLockSet, args=(None, 0))
1587     self._addThread(target=self._doLockSet, args=(None, 1))
1588     self.assertRaises(Queue.Empty, self.done.get_nowait)
1589     self.ls.add("four")
1590     self.ls.add("five", acquired=1)
1591     self.ls.add("six", acquired=1, shared=1)
1592     self.assertEquals(self.ls.list_owned(),
1593       set(["one", "two", "three", "five", "six"]))
1594     self.assertEquals(self.ls.is_owned(), True)
1595     self.assertEquals(self.ls._names(),
1596       set(["one", "two", "three", "four", "five", "six"]))
1597     self.ls.release()
1598     self._waitThreads()
1599     self.assertEqual(self.done.get_nowait(), "DONE")
1600     self.assertEqual(self.done.get_nowait(), "DONE")
1601     self._setUpLS()
1602
1603   @_Repeat
1604   def testEmptyLockSet(self):
1605     # get the set-lock
1606     self.assertEqual(self.ls.acquire(None), set(["one", "two", "three"]))
1607     # now empty it...
1608     self.ls.remove(["one", "two", "three"])
1609     # and adds/locks by another thread still wait
1610     self._addThread(target=self._doAddSet, args=(["nine"]))
1611     self._addThread(target=self._doLockSet, args=(None, 1))
1612     self._addThread(target=self._doLockSet, args=(None, 0))
1613     self.assertRaises(Queue.Empty, self.done.get_nowait)
1614     self.ls.release()
1615     self._waitThreads()
1616     for _ in range(3):
1617       self.assertEqual(self.done.get_nowait(), "DONE")
1618     # empty it again...
1619     self.assertEqual(self.ls.remove(["nine"]), ["nine"])
1620     # now share it...
1621     self.assertEqual(self.ls.acquire(None, shared=1), set())
1622     # other sharers can go, adds still wait
1623     self._addThread(target=self._doLockSet, args=(None, 1))
1624     self._waitThreads()
1625     self.assertEqual(self.done.get_nowait(), "DONE")
1626     self._addThread(target=self._doAddSet, args=(["nine"]))
1627     self.assertRaises(Queue.Empty, self.done.get_nowait)
1628     self.ls.release()
1629     self._waitThreads()
1630     self.assertEqual(self.done.get_nowait(), "DONE")
1631     self._setUpLS()
1632
1633   def testAcquireWithNamesDowngrade(self):
1634     self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1635     self.assertTrue(self.ls.is_owned())
1636     self.assertFalse(self.ls._get_lock().is_owned())
1637     self.ls.release()
1638     self.assertFalse(self.ls.is_owned())
1639     self.assertFalse(self.ls._get_lock().is_owned())
1640     # Can't downgrade after releasing
1641     self.assertRaises(AssertionError, self.ls.downgrade, "two")
1642
1643   def testDowngrade(self):
1644     # Not owning anything, must raise an exception
1645     self.assertFalse(self.ls.is_owned())
1646     self.assertRaises(AssertionError, self.ls.downgrade)
1647
1648     self.assertFalse(compat.any(i.is_owned()
1649                                 for i in self.ls._get_lockdict().values()))
1650     self.assertFalse(self.ls.check_owned(self.ls._names()))
1651     for name in self.ls._names():
1652       self.assertFalse(self.ls.check_owned(name))
1653
1654     self.assertEquals(self.ls.acquire(None, shared=0),
1655                       set(["one", "two", "three"]))
1656     self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1657
1658     self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1659     for name in self.ls._names():
1660       self.assertTrue(self.ls.check_owned(name))
1661       self.assertTrue(self.ls.check_owned(name, shared=0))
1662       self.assertFalse(self.ls.check_owned(name, shared=1))
1663
1664     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1665     self.assertTrue(compat.all(i.is_owned(shared=0)
1666                                for i in self.ls._get_lockdict().values()))
1667
1668     # Start downgrading locks
1669     self.assertTrue(self.ls.downgrade(names=["one"]))
1670     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1671     self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1672                                for name, lock in
1673                                  self.ls._get_lockdict().items()))
1674
1675     self.assertFalse(self.ls.check_owned("one", shared=0))
1676     self.assertTrue(self.ls.check_owned("one", shared=1))
1677     self.assertTrue(self.ls.check_owned("two", shared=0))
1678     self.assertTrue(self.ls.check_owned("three", shared=0))
1679
1680     # Downgrade second lock
1681     self.assertTrue(self.ls.downgrade(names="two"))
1682     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1683     should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1684     self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1685                                for name, lock in
1686                                  self.ls._get_lockdict().items()))
1687
1688     self.assertFalse(self.ls.check_owned("one", shared=0))
1689     self.assertTrue(self.ls.check_owned("one", shared=1))
1690     self.assertFalse(self.ls.check_owned("two", shared=0))
1691     self.assertTrue(self.ls.check_owned("two", shared=1))
1692     self.assertTrue(self.ls.check_owned("three", shared=0))
1693
1694     # Downgrading the last exclusive lock to shared must downgrade the
1695     # lockset-internal lock too
1696     self.assertTrue(self.ls.downgrade(names="three"))
1697     self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1698     self.assertTrue(compat.all(i.is_owned(shared=1)
1699                                for i in self.ls._get_lockdict().values()))
1700
1701     # Verify owned locks
1702     for name in self.ls._names():
1703       self.assertTrue(self.ls.check_owned(name, shared=1))
1704
1705     # Downgrading a shared lock must be a no-op
1706     self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1707     self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1708     self.assertTrue(compat.all(i.is_owned(shared=1)
1709                                for i in self.ls._get_lockdict().values()))
1710
1711     self.ls.release()
1712
1713   def testDowngradeEverything(self):
1714     self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0),
1715                      set(["one", "two", "three"]))
1716
1717     # Ensure all locks are now owned in exclusive mode
1718     for name in self.ls._names():
1719       self.assertTrue(self.ls.check_owned(name, shared=0))
1720
1721     # Downgrade everything
1722     self.assertTrue(self.ls.downgrade())
1723
1724     # Ensure all locks are now owned in shared mode
1725     for name in self.ls._names():
1726       self.assertTrue(self.ls.check_owned(name, shared=1))
1727
1728   def testPriority(self):
1729     def _Acquire(prev, next, name, priority, success_fn):
1730       prev.wait()
1731       self.assert_(self.ls.acquire(name, shared=0,
1732                                    priority=priority,
1733                                    test_notify=lambda _: next.set()))
1734       try:
1735         success_fn()
1736       finally:
1737         self.ls.release()
1738
1739     # Get all in exclusive mode
1740     self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1741
1742     done_two = Queue.Queue(0)
1743
1744     first = threading.Event()
1745     prev = first
1746
1747     acquires = [("one", prio, self.done) for prio in range(1, 33)]
1748     acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1749
1750     # Use a deterministic random generator
1751     random.Random(741).shuffle(acquires)
1752
1753     for (name, prio, done) in acquires:
1754       ev = threading.Event()
1755       self._addThread(target=_Acquire,
1756                       args=(prev, ev, name, prio,
1757                             compat.partial(done.put, "Prio%s" % prio)))
1758       prev = ev
1759
1760     # Start acquires
1761     first.set()
1762
1763     # Wait for last acquire to start
1764     prev.wait()
1765
1766     # Let threads acquire locks
1767     self.ls.release()
1768
1769     # Wait for threads to finish
1770     self._waitThreads()
1771
1772     for i in range(1, 33):
1773       self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1774       self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1775
1776     self.assertRaises(Queue.Empty, self.done.get_nowait)
1777     self.assertRaises(Queue.Empty, done_two.get_nowait)
1778
1779
1780 class TestGanetiLockManager(_ThreadedTestCase):
1781   def setUp(self):
1782     _ThreadedTestCase.setUp(self)
1783     self.nodes = ["n1", "n2"]
1784     self.nodegroups = ["g1", "g2"]
1785     self.instances = ["i1", "i2", "i3"]
1786     self.networks = ["net1", "net2", "net3"]
1787     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1788                                         self.instances, self.networks)
1789
1790   def tearDown(self):
1791     # Don't try this at home...
1792     locking.GanetiLockManager._instance = None
1793
1794   def testLockingConstants(self):
1795     # The locking library internally cheats by assuming its constants have some
1796     # relationships with each other. Check those hold true.
1797     # This relationship is also used in the Processor to recursively acquire
1798     # the right locks. Again, please don't break it.
1799     for i in range(len(locking.LEVELS)):
1800       self.assertEqual(i, locking.LEVELS[i])
1801
1802   def testDoubleGLFails(self):
1803     self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
1804
1805   def testLockNames(self):
1806     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1807     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1808     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1809                      set(self.nodegroups))
1810     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1811                      set(self.instances))
1812     self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
1813                      set(self.networks))
1814
1815   def testInitAndResources(self):
1816     locking.GanetiLockManager._instance = None
1817     self.GL = locking.GanetiLockManager([], [], [], [])
1818     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1819     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1820     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1821     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1822     self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
1823
1824     locking.GanetiLockManager._instance = None
1825     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
1826     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1827     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1828     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1829                                     set(self.nodegroups))
1830     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1831     self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
1832
1833     locking.GanetiLockManager._instance = None
1834     self.GL = locking.GanetiLockManager([], [], self.instances, [])
1835     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1836     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1837     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1838     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1839                      set(self.instances))
1840
1841     locking.GanetiLockManager._instance = None
1842     self.GL = locking.GanetiLockManager([], [], [], self.networks)
1843     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1844     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1845     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1846     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1847     self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
1848                      set(self.networks))
1849
1850   def testAcquireRelease(self):
1851     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1852     self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(["BGL"]))
1853     self.GL.acquire(locking.LEVEL_INSTANCE, ["i1"])
1854     self.GL.acquire(locking.LEVEL_NODEGROUP, ["g2"])
1855     self.GL.acquire(locking.LEVEL_NODE, ["n1", "n2"], shared=1)
1856     self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
1857                                         shared=1))
1858     self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
1859     self.GL.release(locking.LEVEL_NODE, ["n2"])
1860     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(["n1"]))
1861     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
1862     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
1863     self.GL.release(locking.LEVEL_NODE)
1864     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
1865     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
1866     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
1867     self.GL.release(locking.LEVEL_NODEGROUP)
1868     self.GL.release(locking.LEVEL_INSTANCE)
1869     self.assertRaises(errors.LockError, self.GL.acquire,
1870                       locking.LEVEL_INSTANCE, ["i5"])
1871     self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"], shared=1)
1872     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i3"]))
1873
1874   def testAcquireWholeSets(self):
1875     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1876     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1877                       set(self.instances))
1878     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1879                       set(self.instances))
1880     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1881                       set(self.nodegroups))
1882     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
1883                       set(self.nodegroups))
1884     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1885                       set(self.nodes))
1886     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1887                       set(self.nodes))
1888     self.GL.release(locking.LEVEL_NODE)
1889     self.GL.release(locking.LEVEL_NODEGROUP)
1890     self.GL.release(locking.LEVEL_INSTANCE)
1891     self.GL.release(locking.LEVEL_CLUSTER)
1892
1893   def testAcquireWholeAndPartial(self):
1894     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1895     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1896                       set(self.instances))
1897     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1898                       set(self.instances))
1899     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ["n2"], shared=1),
1900                       set(["n2"]))
1901     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1902                       set(["n2"]))
1903     self.GL.release(locking.LEVEL_NODE)
1904     self.GL.release(locking.LEVEL_INSTANCE)
1905     self.GL.release(locking.LEVEL_CLUSTER)
1906
1907   def testBGLDependency(self):
1908     self.assertRaises(AssertionError, self.GL.acquire,
1909                       locking.LEVEL_NODE, ["n1", "n2"])
1910     self.assertRaises(AssertionError, self.GL.acquire,
1911                       locking.LEVEL_INSTANCE, ["i3"])
1912     self.assertRaises(AssertionError, self.GL.acquire,
1913                       locking.LEVEL_NODEGROUP, ["g1"])
1914     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1915     self.GL.acquire(locking.LEVEL_NODE, ["n1"])
1916     self.assertRaises(AssertionError, self.GL.release,
1917                       locking.LEVEL_CLUSTER, ["BGL"])
1918     self.assertRaises(AssertionError, self.GL.release,
1919                       locking.LEVEL_CLUSTER)
1920     self.GL.release(locking.LEVEL_NODE)
1921     self.GL.acquire(locking.LEVEL_INSTANCE, ["i1", "i2"])
1922     self.assertRaises(AssertionError, self.GL.release,
1923                       locking.LEVEL_CLUSTER, ["BGL"])
1924     self.assertRaises(AssertionError, self.GL.release,
1925                       locking.LEVEL_CLUSTER)
1926     self.GL.release(locking.LEVEL_INSTANCE)
1927     self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1928     self.GL.release(locking.LEVEL_NODEGROUP, ["g1"])
1929     self.assertRaises(AssertionError, self.GL.release,
1930                       locking.LEVEL_CLUSTER, ["BGL"])
1931     self.assertRaises(AssertionError, self.GL.release,
1932                       locking.LEVEL_CLUSTER)
1933     self.GL.release(locking.LEVEL_NODEGROUP)
1934     self.GL.release(locking.LEVEL_CLUSTER)
1935
1936   def testWrongOrder(self):
1937     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1938     self.GL.acquire(locking.LEVEL_NODE, ["n2"])
1939     self.assertRaises(AssertionError, self.GL.acquire,
1940                       locking.LEVEL_NODE, ["n1"])
1941     self.assertRaises(AssertionError, self.GL.acquire,
1942                       locking.LEVEL_NODEGROUP, ["g1"])
1943     self.assertRaises(AssertionError, self.GL.acquire,
1944                       locking.LEVEL_INSTANCE, ["i2"])
1945
1946   def testModifiableLevels(self):
1947     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1948                       ["BGL2"])
1949     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"])
1950     self.GL.add(locking.LEVEL_INSTANCE, ["i4"])
1951     self.GL.remove(locking.LEVEL_INSTANCE, ["i3"])
1952     self.GL.remove(locking.LEVEL_INSTANCE, ["i1"])
1953     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(["i2", "i4"]))
1954     self.GL.add(locking.LEVEL_NODE, ["n3"])
1955     self.GL.remove(locking.LEVEL_NODE, ["n1"])
1956     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(["n2", "n3"]))
1957     self.GL.add(locking.LEVEL_NODEGROUP, ["g3"])
1958     self.GL.remove(locking.LEVEL_NODEGROUP, ["g2"])
1959     self.GL.remove(locking.LEVEL_NODEGROUP, ["g1"])
1960     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(["g3"]))
1961     self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1962                       ["BGL2"])
1963
1964   # Helper function to run as a thread that shared the BGL and then acquires
1965   # some locks at another level.
1966   def _doLock(self, level, names, shared):
1967     try:
1968       self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1969       self.GL.acquire(level, names, shared=shared)
1970       self.done.put("DONE")
1971       self.GL.release(level)
1972       self.GL.release(locking.LEVEL_CLUSTER)
1973     except errors.LockError:
1974       self.done.put("ERR")
1975
1976   @_Repeat
1977   def testConcurrency(self):
1978     self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1979     self._addThread(target=self._doLock,
1980                     args=(locking.LEVEL_INSTANCE, "i1", 1))
1981     self._waitThreads()
1982     self.assertEqual(self.done.get_nowait(), "DONE")
1983     self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"])
1984     self._addThread(target=self._doLock,
1985                     args=(locking.LEVEL_INSTANCE, "i1", 1))
1986     self._waitThreads()
1987     self.assertEqual(self.done.get_nowait(), "DONE")
1988     self._addThread(target=self._doLock,
1989                     args=(locking.LEVEL_INSTANCE, "i3", 1))
1990     self.assertRaises(Queue.Empty, self.done.get_nowait)
1991     self.GL.release(locking.LEVEL_INSTANCE)
1992     self._waitThreads()
1993     self.assertEqual(self.done.get_nowait(), "DONE")
1994     self.GL.acquire(locking.LEVEL_INSTANCE, ["i2"], shared=1)
1995     self._addThread(target=self._doLock,
1996                     args=(locking.LEVEL_INSTANCE, "i2", 1))
1997     self._waitThreads()
1998     self.assertEqual(self.done.get_nowait(), "DONE")
1999     self._addThread(target=self._doLock,
2000                     args=(locking.LEVEL_INSTANCE, "i2", 0))
2001     self.assertRaises(Queue.Empty, self.done.get_nowait)
2002     self.GL.release(locking.LEVEL_INSTANCE)
2003     self._waitThreads()
2004     self.assertEqual(self.done.get(True, 1), "DONE")
2005     self.GL.release(locking.LEVEL_CLUSTER, ["BGL"])
2006
2007
2008 class TestLockMonitor(_ThreadedTestCase):
2009   def setUp(self):
2010     _ThreadedTestCase.setUp(self)
2011     self.lm = locking.LockMonitor()
2012
2013   def testSingleThread(self):
2014     locks = []
2015
2016     for i in range(100):
2017       name = "TestLock%s" % i
2018       locks.append(locking.SharedLock(name, monitor=self.lm))
2019
2020     self.assertEqual(len(self.lm._locks), len(locks))
2021     result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
2022     self.assertEqual(len(result.fields), 1)
2023     self.assertEqual(len(result.data), 100)
2024
2025     # Delete all locks
2026     del locks[:]
2027
2028     # The garbage collector might needs some time
2029     def _CheckLocks():
2030       if self.lm._locks:
2031         raise utils.RetryAgain()
2032
2033     utils.Retry(_CheckLocks, 0.1, 30.0)
2034
2035     self.assertFalse(self.lm._locks)
2036
2037   def testMultiThread(self):
2038     locks = []
2039
2040     def _CreateLock(prev, next, name):
2041       prev.wait()
2042       locks.append(locking.SharedLock(name, monitor=self.lm))
2043       if next:
2044         next.set()
2045
2046     expnames = []
2047
2048     first = threading.Event()
2049     prev = first
2050
2051     # Use a deterministic random generator
2052     for i in random.Random(4263).sample(range(100), 33):
2053       name = "MtTestLock%s" % i
2054       expnames.append(name)
2055
2056       ev = threading.Event()
2057       self._addThread(target=_CreateLock, args=(prev, ev, name))
2058       prev = ev
2059
2060     # Add locks
2061     first.set()
2062     self._waitThreads()
2063
2064     # Check order in which locks were added
2065     self.assertEqual([i.name for i in locks], expnames)
2066
2067     # Check query result
2068     result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2069     self.assert_(isinstance(result, dict))
2070     response = objects.QueryResponse.FromDict(result)
2071     self.assertEqual(response.data,
2072                      [[(constants.RS_NORMAL, name),
2073                        (constants.RS_NORMAL, None),
2074                        (constants.RS_NORMAL, None),
2075                        (constants.RS_NORMAL, [])]
2076                       for name in utils.NiceSort(expnames)])
2077     self.assertEqual(len(response.fields), 4)
2078     self.assertEqual(["name", "mode", "owner", "pending"],
2079                      [fdef.name for fdef in response.fields])
2080
2081     # Test exclusive acquire
2082     for tlock in locks[::4]:
2083       tlock.acquire(shared=0)
2084       try:
2085         def _GetExpResult(name):
2086           if tlock.name == name:
2087             return [(constants.RS_NORMAL, name),
2088                     (constants.RS_NORMAL, "exclusive"),
2089                     (constants.RS_NORMAL,
2090                      [threading.currentThread().getName()]),
2091                     (constants.RS_NORMAL, [])]
2092           return [(constants.RS_NORMAL, name),
2093                   (constants.RS_NORMAL, None),
2094                   (constants.RS_NORMAL, None),
2095                   (constants.RS_NORMAL, [])]
2096
2097         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2098         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2099                          [_GetExpResult(name)
2100                           for name in utils.NiceSort(expnames)])
2101       finally:
2102         tlock.release()
2103
2104     # Test shared acquire
2105     def _Acquire(lock, shared, ev, notify):
2106       lock.acquire(shared=shared)
2107       try:
2108         notify.set()
2109         ev.wait()
2110       finally:
2111         lock.release()
2112
2113     for tlock1 in locks[::11]:
2114       for tlock2 in locks[::-15]:
2115         if tlock2 == tlock1:
2116           # Avoid deadlocks
2117           continue
2118
2119         for tlock3 in locks[::10]:
2120           if tlock3 in (tlock2, tlock1):
2121             # Avoid deadlocks
2122             continue
2123
2124           releaseev = threading.Event()
2125
2126           # Acquire locks
2127           acquireev = []
2128           tthreads1 = []
2129           for i in range(3):
2130             ev = threading.Event()
2131             tthreads1.append(self._addThread(target=_Acquire,
2132                                              args=(tlock1, 1, releaseev, ev)))
2133             acquireev.append(ev)
2134
2135           ev = threading.Event()
2136           tthread2 = self._addThread(target=_Acquire,
2137                                      args=(tlock2, 1, releaseev, ev))
2138           acquireev.append(ev)
2139
2140           ev = threading.Event()
2141           tthread3 = self._addThread(target=_Acquire,
2142                                      args=(tlock3, 0, releaseev, ev))
2143           acquireev.append(ev)
2144
2145           # Wait for all locks to be acquired
2146           for i in acquireev:
2147             i.wait()
2148
2149           # Check query result
2150           result = self.lm.QueryLocks(["name", "mode", "owner"])
2151           response = objects.QueryResponse.FromDict(result)
2152           for (name, mode, owner) in response.data:
2153             (name_status, name_value) = name
2154             (owner_status, owner_value) = owner
2155
2156             self.assertEqual(name_status, constants.RS_NORMAL)
2157             self.assertEqual(owner_status, constants.RS_NORMAL)
2158
2159             if name_value == tlock1.name:
2160               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2161               self.assertEqual(set(owner_value),
2162                                set(i.getName() for i in tthreads1))
2163               continue
2164
2165             if name_value == tlock2.name:
2166               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2167               self.assertEqual(owner_value, [tthread2.getName()])
2168               continue
2169
2170             if name_value == tlock3.name:
2171               self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2172               self.assertEqual(owner_value, [tthread3.getName()])
2173               continue
2174
2175             self.assert_(name_value in expnames)
2176             self.assertEqual(mode, (constants.RS_NORMAL, None))
2177             self.assert_(owner_value is None)
2178
2179           # Release locks again
2180           releaseev.set()
2181
2182           self._waitThreads()
2183
2184           result = self.lm.QueryLocks(["name", "mode", "owner"])
2185           self.assertEqual(objects.QueryResponse.FromDict(result).data,
2186                            [[(constants.RS_NORMAL, name),
2187                              (constants.RS_NORMAL, None),
2188                              (constants.RS_NORMAL, None)]
2189                             for name in utils.NiceSort(expnames)])
2190
2191   def testDelete(self):
2192     lock = locking.SharedLock("TestLock", monitor=self.lm)
2193
2194     self.assertEqual(len(self.lm._locks), 1)
2195     result = self.lm.QueryLocks(["name", "mode", "owner"])
2196     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2197                      [[(constants.RS_NORMAL, lock.name),
2198                        (constants.RS_NORMAL, None),
2199                        (constants.RS_NORMAL, None)]])
2200
2201     lock.delete()
2202
2203     result = self.lm.QueryLocks(["name", "mode", "owner"])
2204     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2205                      [[(constants.RS_NORMAL, lock.name),
2206                        (constants.RS_NORMAL, "deleted"),
2207                        (constants.RS_NORMAL, None)]])
2208     self.assertEqual(len(self.lm._locks), 1)
2209
2210   def testPending(self):
2211     def _Acquire(lock, shared, prev, next):
2212       prev.wait()
2213
2214       lock.acquire(shared=shared, test_notify=next.set)
2215       try:
2216         pass
2217       finally:
2218         lock.release()
2219
2220     lock = locking.SharedLock("ExcLock", monitor=self.lm)
2221
2222     for shared in [0, 1]:
2223       lock.acquire()
2224       try:
2225         self.assertEqual(len(self.lm._locks), 1)
2226         result = self.lm.QueryLocks(["name", "mode", "owner"])
2227         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2228                          [[(constants.RS_NORMAL, lock.name),
2229                            (constants.RS_NORMAL, "exclusive"),
2230                            (constants.RS_NORMAL,
2231                             [threading.currentThread().getName()])]])
2232
2233         threads = []
2234
2235         first = threading.Event()
2236         prev = first
2237
2238         for i in range(5):
2239           ev = threading.Event()
2240           threads.append(self._addThread(target=_Acquire,
2241                                           args=(lock, shared, prev, ev)))
2242           prev = ev
2243
2244         # Start acquires
2245         first.set()
2246
2247         # Wait for last acquire to start waiting
2248         prev.wait()
2249
2250         # NOTE: This works only because QueryLocks will acquire the
2251         # lock-internal lock again and won't be able to get the information
2252         # until it has the lock. By then the acquire should be registered in
2253         # SharedLock.__pending (otherwise it's a bug).
2254
2255         # All acquires are waiting now
2256         if shared:
2257           pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2258         else:
2259           pending = [("exclusive", [t.getName()]) for t in threads]
2260
2261         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2262         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2263                          [[(constants.RS_NORMAL, lock.name),
2264                            (constants.RS_NORMAL, "exclusive"),
2265                            (constants.RS_NORMAL,
2266                             [threading.currentThread().getName()]),
2267                            (constants.RS_NORMAL, pending)]])
2268
2269         self.assertEqual(len(self.lm._locks), 1)
2270       finally:
2271         lock.release()
2272
2273       self._waitThreads()
2274
2275       # No pending acquires
2276       result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2277       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2278                        [[(constants.RS_NORMAL, lock.name),
2279                          (constants.RS_NORMAL, None),
2280                          (constants.RS_NORMAL, None),
2281                          (constants.RS_NORMAL, [])]])
2282
2283       self.assertEqual(len(self.lm._locks), 1)
2284
2285   def testDeleteAndRecreate(self):
2286     lname = "TestLock101923193"
2287
2288     # Create some locks with the same name and keep all references
2289     locks = [locking.SharedLock(lname, monitor=self.lm)
2290              for _ in range(5)]
2291
2292     self.assertEqual(len(self.lm._locks), len(locks))
2293
2294     result = self.lm.QueryLocks(["name", "mode", "owner"])
2295     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2296                      [[(constants.RS_NORMAL, lname),
2297                        (constants.RS_NORMAL, None),
2298                        (constants.RS_NORMAL, None)]] * 5)
2299
2300     locks[2].delete()
2301
2302     # Check information order
2303     result = self.lm.QueryLocks(["name", "mode", "owner"])
2304     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2305                      [[(constants.RS_NORMAL, lname),
2306                        (constants.RS_NORMAL, None),
2307                        (constants.RS_NORMAL, None)]] * 2 +
2308                      [[(constants.RS_NORMAL, lname),
2309                        (constants.RS_NORMAL, "deleted"),
2310                        (constants.RS_NORMAL, None)]] +
2311                      [[(constants.RS_NORMAL, lname),
2312                        (constants.RS_NORMAL, None),
2313                        (constants.RS_NORMAL, None)]] * 2)
2314
2315     locks[1].acquire(shared=0)
2316
2317     last_status = [
2318       [(constants.RS_NORMAL, lname),
2319        (constants.RS_NORMAL, None),
2320        (constants.RS_NORMAL, None)],
2321       [(constants.RS_NORMAL, lname),
2322        (constants.RS_NORMAL, "exclusive"),
2323        (constants.RS_NORMAL, [threading.currentThread().getName()])],
2324       [(constants.RS_NORMAL, lname),
2325        (constants.RS_NORMAL, "deleted"),
2326        (constants.RS_NORMAL, None)],
2327       [(constants.RS_NORMAL, lname),
2328        (constants.RS_NORMAL, None),
2329        (constants.RS_NORMAL, None)],
2330       [(constants.RS_NORMAL, lname),
2331        (constants.RS_NORMAL, None),
2332        (constants.RS_NORMAL, None)],
2333       ]
2334
2335     # Check information order
2336     result = self.lm.QueryLocks(["name", "mode", "owner"])
2337     self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2338
2339     self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2340     self.assertEqual(len(self.lm._locks), len(locks))
2341
2342     # Check lock deletion
2343     for idx in range(len(locks)):
2344       del locks[0]
2345       assert gc.isenabled()
2346       gc.collect()
2347       self.assertEqual(len(self.lm._locks), len(locks))
2348       result = self.lm.QueryLocks(["name", "mode", "owner"])
2349       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2350                        last_status[idx + 1:])
2351
2352     # All locks should have been deleted
2353     assert not locks
2354     self.assertFalse(self.lm._locks)
2355
2356     result = self.lm.QueryLocks(["name", "mode", "owner"])
2357     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2358
2359   class _FakeLock:
2360     def __init__(self):
2361       self._info = []
2362
2363     def AddResult(self, *args):
2364       self._info.append(args)
2365
2366     def CountPending(self):
2367       return len(self._info)
2368
2369     def GetLockInfo(self, requested):
2370       (exp_requested, result) = self._info.pop(0)
2371
2372       if exp_requested != requested:
2373         raise Exception("Requested information (%s) does not match"
2374                         " expectations (%s)" % (requested, exp_requested))
2375
2376       return result
2377
2378   def testMultipleResults(self):
2379     fl1 = self._FakeLock()
2380     fl2 = self._FakeLock()
2381
2382     self.lm.RegisterLock(fl1)
2383     self.lm.RegisterLock(fl2)
2384
2385     # Empty information
2386     for i in [fl1, fl2]:
2387       i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2388     result = self.lm.QueryLocks(["name", "mode", "owner"])
2389     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2390     for i in [fl1, fl2]:
2391       self.assertEqual(i.CountPending(), 0)
2392
2393     # Check ordering
2394     for fn in [lambda x: x, reversed, sorted]:
2395       fl1.AddResult(set(), list(fn([
2396         ("aaa", None, None, None),
2397         ("bbb", None, None, None),
2398         ])))
2399       fl2.AddResult(set(), [])
2400       result = self.lm.QueryLocks(["name"])
2401       self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2402         [(constants.RS_NORMAL, "aaa")],
2403         [(constants.RS_NORMAL, "bbb")],
2404         ])
2405       for i in [fl1, fl2]:
2406         self.assertEqual(i.CountPending(), 0)
2407
2408       for fn2 in [lambda x: x, reversed, sorted]:
2409         fl1.AddResult(set([query.LQ_MODE]), list(fn([
2410           # Same name, but different information
2411           ("aaa", "mode0", None, None),
2412           ("aaa", "mode1", None, None),
2413           ("aaa", "mode2", None, None),
2414           ("aaa", "mode3", None, None),
2415           ])))
2416         fl2.AddResult(set([query.LQ_MODE]), [
2417           ("zzz", "end", None, None),
2418           ("000", "start", None, None),
2419           ] + list(fn2([
2420           ("aaa", "b200", None, None),
2421           ("aaa", "b300", None, None),
2422           ])))
2423         result = self.lm.QueryLocks(["name", "mode"])
2424         self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2425           [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2426           ] + list(fn([
2427           # Name is the same, so order must be equal to incoming order
2428           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2429           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2430           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2431           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2432           ])) + list(fn2([
2433           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2434           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2435           ])) + [
2436           [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2437           ])
2438         for i in [fl1, fl2]:
2439           self.assertEqual(i.CountPending(), 0)
2440
2441
2442 if __name__ == "__main__":
2443   testutils.GanetiTestProgram()