Replace all xrange() with range()
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30
31 from ganeti import locking
32 from ganeti import errors
33
34
35 # This is used to test the ssynchronize decorator.
36 # Since it's passed as input to a decorator it must be declared as a global.
37 _decoratorlock = locking.SharedLock()
38
39 #: List for looping tests
40 ITERATIONS = range(8)
41
42
43 def _Repeat(fn):
44   """Decorator for executing a function many times"""
45   def wrapper(*args, **kwargs):
46     for i in ITERATIONS:
47       fn(*args, **kwargs)
48   return wrapper
49
50
51 class _ThreadedTestCase(unittest.TestCase):
52   """Test class that supports adding/waiting on threads"""
53   def setUp(self):
54     unittest.TestCase.setUp(self)
55     self.done = Queue.Queue(0)
56     self.threads = []
57
58   def _addThread(self, *args, **kwargs):
59     """Create and remember a new thread"""
60     t = threading.Thread(*args, **kwargs)
61     self.threads.append(t)
62     t.start()
63     return t
64
65   def _waitThreads(self):
66     """Wait for all our threads to finish"""
67     for t in self.threads:
68       t.join(60)
69       self.failIf(t.isAlive())
70     self.threads = []
71
72
73 class _ConditionTestCase(_ThreadedTestCase):
74   """Common test case for conditions"""
75
76   def setUp(self, cls):
77     _ThreadedTestCase.setUp(self)
78     self.lock = threading.Lock()
79     self.cond = cls(self.lock)
80
81   def _testAcquireRelease(self):
82     self.assert_(not self.cond._is_owned())
83     self.assertRaises(RuntimeError, self.cond.wait)
84     self.assertRaises(RuntimeError, self.cond.notifyAll)
85
86     self.cond.acquire()
87     self.assert_(self.cond._is_owned())
88     self.cond.notifyAll()
89     self.assert_(self.cond._is_owned())
90     self.cond.release()
91
92     self.assert_(not self.cond._is_owned())
93     self.assertRaises(RuntimeError, self.cond.wait)
94     self.assertRaises(RuntimeError, self.cond.notifyAll)
95
96   def _testNotification(self):
97     def _NotifyAll():
98       self.done.put("NE")
99       self.cond.acquire()
100       self.done.put("NA")
101       self.cond.notifyAll()
102       self.done.put("NN")
103       self.cond.release()
104
105     self.cond.acquire()
106     self._addThread(target=_NotifyAll)
107     self.assertEqual(self.done.get(True, 1), "NE")
108     self.assertRaises(Queue.Empty, self.done.get_nowait)
109     self.cond.wait()
110     self.assertEqual(self.done.get(True, 1), "NA")
111     self.assertEqual(self.done.get(True, 1), "NN")
112     self.assert_(self.cond._is_owned())
113     self.cond.release()
114     self.assert_(not self.cond._is_owned())
115
116
117 class TestSingleNotifyPipeCondition(_ConditionTestCase):
118   """SingleNotifyPipeCondition tests"""
119
120   def setUp(self):
121     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
122
123   def testAcquireRelease(self):
124     self._testAcquireRelease()
125
126   def testNotification(self):
127     self._testNotification()
128
129   def testWaitReuse(self):
130     self.cond.acquire()
131     self.cond.wait(0)
132     self.cond.wait(0.1)
133     self.cond.release()
134
135   def testNoNotifyReuse(self):
136     self.cond.acquire()
137     self.cond.notifyAll()
138     self.assertRaises(RuntimeError, self.cond.wait)
139     self.assertRaises(RuntimeError, self.cond.notifyAll)
140     self.cond.release()
141
142
143 class TestPipeCondition(_ConditionTestCase):
144   """PipeCondition tests"""
145
146   def setUp(self):
147     _ConditionTestCase.setUp(self, locking.PipeCondition)
148
149   def testAcquireRelease(self):
150     self._testAcquireRelease()
151
152   def testNotification(self):
153     self._testNotification()
154
155   def _TestWait(self, fn):
156     self._addThread(target=fn)
157     self._addThread(target=fn)
158     self._addThread(target=fn)
159
160     # Wait for threads to be waiting
161     self.assertEqual(self.done.get(True, 1), "A")
162     self.assertEqual(self.done.get(True, 1), "A")
163     self.assertEqual(self.done.get(True, 1), "A")
164
165     self.assertRaises(Queue.Empty, self.done.get_nowait)
166
167     self.cond.acquire()
168     self.assertEqual(self.cond._nwaiters, 3)
169     # This new thread can"t acquire the lock, and thus call wait, before we
170     # release it
171     self._addThread(target=fn)
172     self.cond.notifyAll()
173     self.assertRaises(Queue.Empty, self.done.get_nowait)
174     self.cond.release()
175
176     # We should now get 3 W and 1 A (for the new thread) in whatever order
177     w = 0
178     a = 0
179     for i in range(4):
180       got = self.done.get(True, 1)
181       if got == "W":
182         w += 1
183       elif got == "A":
184         a += 1
185       else:
186         self.fail("Got %s on the done queue" % got)
187
188     self.assertEqual(w, 3)
189     self.assertEqual(a, 1)
190
191     self.cond.acquire()
192     self.cond.notifyAll()
193     self.cond.release()
194     self._waitThreads()
195     self.assertEqual(self.done.get_nowait(), "W")
196     self.assertRaises(Queue.Empty, self.done.get_nowait)
197
198   def testBlockingWait(self):
199     def _BlockingWait():
200       self.cond.acquire()
201       self.done.put("A")
202       self.cond.wait()
203       self.cond.release()
204       self.done.put("W")
205
206     self._TestWait(_BlockingWait)
207
208   def testLongTimeoutWait(self):
209     def _Helper():
210       self.cond.acquire()
211       self.done.put("A")
212       self.cond.wait(15.0)
213       self.cond.release()
214       self.done.put("W")
215
216     self._TestWait(_Helper)
217
218   def _TimeoutWait(self, timeout, check):
219     self.cond.acquire()
220     self.cond.wait(timeout)
221     self.cond.release()
222     self.done.put(check)
223
224   def testShortTimeoutWait(self):
225     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
226     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
227     self._waitThreads()
228     self.assertEqual(self.done.get_nowait(), "T1")
229     self.assertEqual(self.done.get_nowait(), "T1")
230     self.assertRaises(Queue.Empty, self.done.get_nowait)
231
232   def testZeroTimeoutWait(self):
233     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
234     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
235     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
236     self._waitThreads()
237     self.assertEqual(self.done.get_nowait(), "T0")
238     self.assertEqual(self.done.get_nowait(), "T0")
239     self.assertEqual(self.done.get_nowait(), "T0")
240     self.assertRaises(Queue.Empty, self.done.get_nowait)
241
242
243 class TestSharedLock(_ThreadedTestCase):
244   """SharedLock tests"""
245
246   def setUp(self):
247     _ThreadedTestCase.setUp(self)
248     self.sl = locking.SharedLock()
249
250   def testSequenceAndOwnership(self):
251     self.assert_(not self.sl._is_owned())
252     self.sl.acquire(shared=1)
253     self.assert_(self.sl._is_owned())
254     self.assert_(self.sl._is_owned(shared=1))
255     self.assert_(not self.sl._is_owned(shared=0))
256     self.sl.release()
257     self.assert_(not self.sl._is_owned())
258     self.sl.acquire()
259     self.assert_(self.sl._is_owned())
260     self.assert_(not self.sl._is_owned(shared=1))
261     self.assert_(self.sl._is_owned(shared=0))
262     self.sl.release()
263     self.assert_(not self.sl._is_owned())
264     self.sl.acquire(shared=1)
265     self.assert_(self.sl._is_owned())
266     self.assert_(self.sl._is_owned(shared=1))
267     self.assert_(not self.sl._is_owned(shared=0))
268     self.sl.release()
269     self.assert_(not self.sl._is_owned())
270
271   def testBooleanValue(self):
272     # semaphores are supposed to return a true value on a successful acquire
273     self.assert_(self.sl.acquire(shared=1))
274     self.sl.release()
275     self.assert_(self.sl.acquire())
276     self.sl.release()
277
278   def testDoubleLockingStoE(self):
279     self.sl.acquire(shared=1)
280     self.assertRaises(AssertionError, self.sl.acquire)
281
282   def testDoubleLockingEtoS(self):
283     self.sl.acquire()
284     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
285
286   def testDoubleLockingStoS(self):
287     self.sl.acquire(shared=1)
288     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
289
290   def testDoubleLockingEtoE(self):
291     self.sl.acquire()
292     self.assertRaises(AssertionError, self.sl.acquire)
293
294   # helper functions: called in a separate thread they acquire the lock, send
295   # their identifier on the done queue, then release it.
296   def _doItSharer(self):
297     try:
298       self.sl.acquire(shared=1)
299       self.done.put('SHR')
300       self.sl.release()
301     except errors.LockError:
302       self.done.put('ERR')
303
304   def _doItExclusive(self):
305     try:
306       self.sl.acquire()
307       self.done.put('EXC')
308       self.sl.release()
309     except errors.LockError:
310       self.done.put('ERR')
311
312   def _doItDelete(self):
313     try:
314       self.sl.delete()
315       self.done.put('DEL')
316     except errors.LockError:
317       self.done.put('ERR')
318
319   def testSharersCanCoexist(self):
320     self.sl.acquire(shared=1)
321     threading.Thread(target=self._doItSharer).start()
322     self.assert_(self.done.get(True, 1))
323     self.sl.release()
324
325   @_Repeat
326   def testExclusiveBlocksExclusive(self):
327     self.sl.acquire()
328     self._addThread(target=self._doItExclusive)
329     self.assertRaises(Queue.Empty, self.done.get_nowait)
330     self.sl.release()
331     self._waitThreads()
332     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
333
334   @_Repeat
335   def testExclusiveBlocksDelete(self):
336     self.sl.acquire()
337     self._addThread(target=self._doItDelete)
338     self.assertRaises(Queue.Empty, self.done.get_nowait)
339     self.sl.release()
340     self._waitThreads()
341     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
342     self.sl = locking.SharedLock()
343
344   @_Repeat
345   def testExclusiveBlocksSharer(self):
346     self.sl.acquire()
347     self._addThread(target=self._doItSharer)
348     self.assertRaises(Queue.Empty, self.done.get_nowait)
349     self.sl.release()
350     self._waitThreads()
351     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
352
353   @_Repeat
354   def testSharerBlocksExclusive(self):
355     self.sl.acquire(shared=1)
356     self._addThread(target=self._doItExclusive)
357     self.assertRaises(Queue.Empty, self.done.get_nowait)
358     self.sl.release()
359     self._waitThreads()
360     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
361
362   @_Repeat
363   def testSharerBlocksDelete(self):
364     self.sl.acquire(shared=1)
365     self._addThread(target=self._doItDelete)
366     self.assertRaises(Queue.Empty, self.done.get_nowait)
367     self.sl.release()
368     self._waitThreads()
369     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
370     self.sl = locking.SharedLock()
371
372   @_Repeat
373   def testWaitingExclusiveBlocksSharer(self):
374     """SKIPPED testWaitingExclusiveBlockSharer"""
375     return
376
377     self.sl.acquire(shared=1)
378     # the lock is acquired in shared mode...
379     self._addThread(target=self._doItExclusive)
380     # ...but now an exclusive is waiting...
381     self._addThread(target=self._doItSharer)
382     # ...so the sharer should be blocked as well
383     self.assertRaises(Queue.Empty, self.done.get_nowait)
384     self.sl.release()
385     self._waitThreads()
386     # The exclusive passed before
387     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
388     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
389
390   @_Repeat
391   def testWaitingSharerBlocksExclusive(self):
392     """SKIPPED testWaitingSharerBlocksExclusive"""
393     return
394
395     self.sl.acquire()
396     # the lock is acquired in exclusive mode...
397     self._addThread(target=self._doItSharer)
398     # ...but now a sharer is waiting...
399     self._addThread(target=self._doItExclusive)
400     # ...the exclusive is waiting too...
401     self.assertRaises(Queue.Empty, self.done.get_nowait)
402     self.sl.release()
403     self._waitThreads()
404     # The sharer passed before
405     self.assertEqual(self.done.get_nowait(), 'SHR')
406     self.assertEqual(self.done.get_nowait(), 'EXC')
407
408   def testDelete(self):
409     self.sl.delete()
410     self.assertRaises(errors.LockError, self.sl.acquire)
411     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
412     self.assertRaises(errors.LockError, self.sl.delete)
413
414   def testDeleteTimeout(self):
415     self.sl.delete(timeout=60)
416
417   def testNoDeleteIfSharer(self):
418     self.sl.acquire(shared=1)
419     self.assertRaises(AssertionError, self.sl.delete)
420
421   @_Repeat
422   def testDeletePendingSharersExclusiveDelete(self):
423     self.sl.acquire()
424     self._addThread(target=self._doItSharer)
425     self._addThread(target=self._doItSharer)
426     self._addThread(target=self._doItExclusive)
427     self._addThread(target=self._doItDelete)
428     self.sl.delete()
429     self._waitThreads()
430     # The threads who were pending return ERR
431     for _ in range(4):
432       self.assertEqual(self.done.get_nowait(), 'ERR')
433     self.sl = locking.SharedLock()
434
435   @_Repeat
436   def testDeletePendingDeleteExclusiveSharers(self):
437     self.sl.acquire()
438     self._addThread(target=self._doItDelete)
439     self._addThread(target=self._doItExclusive)
440     self._addThread(target=self._doItSharer)
441     self._addThread(target=self._doItSharer)
442     self.sl.delete()
443     self._waitThreads()
444     # The two threads who were pending return both ERR
445     self.assertEqual(self.done.get_nowait(), 'ERR')
446     self.assertEqual(self.done.get_nowait(), 'ERR')
447     self.assertEqual(self.done.get_nowait(), 'ERR')
448     self.assertEqual(self.done.get_nowait(), 'ERR')
449     self.sl = locking.SharedLock()
450
451   @_Repeat
452   def testExclusiveAcquireTimeout(self):
453     for shared in [0, 1]:
454       on_queue = threading.Event()
455       release_exclusive = threading.Event()
456
457       def _LockExclusive():
458         self.sl.acquire(shared=0, test_notify=on_queue.set)
459         self.done.put("A: start wait")
460         release_exclusive.wait()
461         self.done.put("A: end wait")
462         self.sl.release()
463
464       # Start thread to hold lock in exclusive mode
465       self._addThread(target=_LockExclusive)
466
467       # Wait for wait to begin
468       self.assertEqual(self.done.get(timeout=60), "A: start wait")
469
470       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
471       # on the queue
472       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
473                                       test_notify=release_exclusive.set))
474
475       self.done.put("got 2nd")
476       self.sl.release()
477
478       self._waitThreads()
479
480       self.assertEqual(self.done.get_nowait(), "A: end wait")
481       self.assertEqual(self.done.get_nowait(), "got 2nd")
482       self.assertRaises(Queue.Empty, self.done.get_nowait)
483
484   @_Repeat
485   def testAcquireExpiringTimeout(self):
486     def _AcquireWithTimeout(shared, timeout):
487       if not self.sl.acquire(shared=shared, timeout=timeout):
488         self.done.put("timeout")
489
490     for shared in [0, 1]:
491       # Lock exclusively
492       self.sl.acquire()
493
494       # Start shared acquires with timeout between 0 and 20 ms
495       for i in range(11):
496         self._addThread(target=_AcquireWithTimeout,
497                         args=(shared, i * 2.0 / 1000.0))
498
499       # Wait for threads to finish (makes sure the acquire timeout expires
500       # before releasing the lock)
501       self._waitThreads()
502
503       # Release lock
504       self.sl.release()
505
506       for _ in range(11):
507         self.assertEqual(self.done.get_nowait(), "timeout")
508
509       self.assertRaises(Queue.Empty, self.done.get_nowait)
510
511   @_Repeat
512   def testSharedSkipExclusiveAcquires(self):
513     # Tests whether shared acquires jump in front of exclusive acquires in the
514     # queue.
515
516     def _Acquire(shared, name, notify_ev, wait_ev):
517       if notify_ev:
518         notify_fn = notify_ev.set
519       else:
520         notify_fn = None
521
522       if wait_ev:
523         wait_ev.wait()
524
525       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
526         return
527
528       self.done.put(name)
529       self.sl.release()
530
531     # Get exclusive lock while we fill the queue
532     self.sl.acquire()
533
534     shrcnt1 = 5
535     shrcnt2 = 7
536     shrcnt3 = 9
537     shrcnt4 = 2
538
539     # Add acquires using threading.Event for synchronization. They'll be
540     # acquired exactly in the order defined in this list.
541     acquires = (shrcnt1 * [(1, "shared 1")] +
542                 3 * [(0, "exclusive 1")] +
543                 shrcnt2 * [(1, "shared 2")] +
544                 shrcnt3 * [(1, "shared 3")] +
545                 shrcnt4 * [(1, "shared 4")] +
546                 3 * [(0, "exclusive 2")])
547
548     ev_cur = None
549     ev_prev = None
550
551     for args in acquires:
552       ev_cur = threading.Event()
553       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
554       ev_prev = ev_cur
555
556     # Wait for last acquire to start
557     ev_prev.wait()
558
559     # Expect 6 pending exclusive acquires and 1 for all shared acquires
560     # together
561     self.assertEqual(self.sl._count_pending(), 7)
562
563     # Release exclusive lock and wait
564     self.sl.release()
565
566     self._waitThreads()
567
568     # Check sequence
569     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
570       # Shared locks aren't guaranteed to be notified in order, but they'll be
571       # first
572       tmp = self.done.get_nowait()
573       if tmp == "shared 1":
574         shrcnt1 -= 1
575       elif tmp == "shared 2":
576         shrcnt2 -= 1
577       elif tmp == "shared 3":
578         shrcnt3 -= 1
579       elif tmp == "shared 4":
580         shrcnt4 -= 1
581     self.assertEqual(shrcnt1, 0)
582     self.assertEqual(shrcnt2, 0)
583     self.assertEqual(shrcnt3, 0)
584     self.assertEqual(shrcnt3, 0)
585
586     for _ in range(3):
587       self.assertEqual(self.done.get_nowait(), "exclusive 1")
588
589     for _ in range(3):
590       self.assertEqual(self.done.get_nowait(), "exclusive 2")
591
592     self.assertRaises(Queue.Empty, self.done.get_nowait)
593
594   @_Repeat
595   def testMixedAcquireTimeout(self):
596     sync = threading.Condition()
597
598     def _AcquireShared(ev):
599       if not self.sl.acquire(shared=1, timeout=None):
600         return
601
602       self.done.put("shared")
603
604       # Notify main thread
605       ev.set()
606
607       # Wait for notification
608       sync.acquire()
609       try:
610         sync.wait()
611       finally:
612         sync.release()
613
614       # Release lock
615       self.sl.release()
616
617     acquires = []
618     for _ in range(3):
619       ev = threading.Event()
620       self._addThread(target=_AcquireShared, args=(ev, ))
621       acquires.append(ev)
622
623     # Wait for all acquires to finish
624     for i in acquires:
625       i.wait()
626
627     self.assertEqual(self.sl._count_pending(), 0)
628
629     # Try to get exclusive lock
630     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
631
632     # Acquire exclusive without timeout
633     exclsync = threading.Condition()
634     exclev = threading.Event()
635
636     def _AcquireExclusive():
637       if not self.sl.acquire(shared=0):
638         return
639
640       self.done.put("exclusive")
641
642       # Notify main thread
643       exclev.set()
644
645       exclsync.acquire()
646       try:
647         exclsync.wait()
648       finally:
649         exclsync.release()
650
651       self.sl.release()
652
653     self._addThread(target=_AcquireExclusive)
654
655     # Try to get exclusive lock
656     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
657
658     # Make all shared holders release their locks
659     sync.acquire()
660     try:
661       sync.notifyAll()
662     finally:
663       sync.release()
664
665     # Wait for exclusive acquire to succeed
666     exclev.wait()
667
668     self.assertEqual(self.sl._count_pending(), 0)
669
670     # Try to get exclusive lock
671     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
672
673     def _AcquireSharedSimple():
674       if self.sl.acquire(shared=1, timeout=None):
675         self.done.put("shared2")
676         self.sl.release()
677
678     for _ in range(10):
679       self._addThread(target=_AcquireSharedSimple)
680
681     # Tell exclusive lock to release
682     exclsync.acquire()
683     try:
684       exclsync.notifyAll()
685     finally:
686       exclsync.release()
687
688     # Wait for everything to finish
689     self._waitThreads()
690
691     self.assertEqual(self.sl._count_pending(), 0)
692
693     # Check sequence
694     for _ in range(3):
695       self.assertEqual(self.done.get_nowait(), "shared")
696
697     self.assertEqual(self.done.get_nowait(), "exclusive")
698
699     for _ in range(10):
700       self.assertEqual(self.done.get_nowait(), "shared2")
701
702     self.assertRaises(Queue.Empty, self.done.get_nowait)
703
704
705 class TestSSynchronizedDecorator(_ThreadedTestCase):
706   """Shared Lock Synchronized decorator test"""
707
708   def setUp(self):
709     _ThreadedTestCase.setUp(self)
710
711   @locking.ssynchronized(_decoratorlock)
712   def _doItExclusive(self):
713     self.assert_(_decoratorlock._is_owned())
714     self.done.put('EXC')
715
716   @locking.ssynchronized(_decoratorlock, shared=1)
717   def _doItSharer(self):
718     self.assert_(_decoratorlock._is_owned(shared=1))
719     self.done.put('SHR')
720
721   def testDecoratedFunctions(self):
722     self._doItExclusive()
723     self.assert_(not _decoratorlock._is_owned())
724     self._doItSharer()
725     self.assert_(not _decoratorlock._is_owned())
726
727   def testSharersCanCoexist(self):
728     _decoratorlock.acquire(shared=1)
729     threading.Thread(target=self._doItSharer).start()
730     self.assert_(self.done.get(True, 1))
731     _decoratorlock.release()
732
733   @_Repeat
734   def testExclusiveBlocksExclusive(self):
735     _decoratorlock.acquire()
736     self._addThread(target=self._doItExclusive)
737     # give it a bit of time to check that it's not actually doing anything
738     self.assertRaises(Queue.Empty, self.done.get_nowait)
739     _decoratorlock.release()
740     self._waitThreads()
741     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
742
743   @_Repeat
744   def testExclusiveBlocksSharer(self):
745     _decoratorlock.acquire()
746     self._addThread(target=self._doItSharer)
747     self.assertRaises(Queue.Empty, self.done.get_nowait)
748     _decoratorlock.release()
749     self._waitThreads()
750     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
751
752   @_Repeat
753   def testSharerBlocksExclusive(self):
754     _decoratorlock.acquire(shared=1)
755     self._addThread(target=self._doItExclusive)
756     self.assertRaises(Queue.Empty, self.done.get_nowait)
757     _decoratorlock.release()
758     self._waitThreads()
759     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
760
761
762 class TestLockSet(_ThreadedTestCase):
763   """LockSet tests"""
764
765   def setUp(self):
766     _ThreadedTestCase.setUp(self)
767     self._setUpLS()
768
769   def _setUpLS(self):
770     """Helper to (re)initialize the lock set"""
771     self.resources = ['one', 'two', 'three']
772     self.ls = locking.LockSet(members=self.resources)
773
774   def testResources(self):
775     self.assertEquals(self.ls._names(), set(self.resources))
776     newls = locking.LockSet()
777     self.assertEquals(newls._names(), set())
778
779   def testAcquireRelease(self):
780     self.assert_(self.ls.acquire('one'))
781     self.assertEquals(self.ls._list_owned(), set(['one']))
782     self.ls.release()
783     self.assertEquals(self.ls._list_owned(), set())
784     self.assertEquals(self.ls.acquire(['one']), set(['one']))
785     self.assertEquals(self.ls._list_owned(), set(['one']))
786     self.ls.release()
787     self.assertEquals(self.ls._list_owned(), set())
788     self.ls.acquire(['one', 'two', 'three'])
789     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
790     self.ls.release('one')
791     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
792     self.ls.release(['three'])
793     self.assertEquals(self.ls._list_owned(), set(['two']))
794     self.ls.release()
795     self.assertEquals(self.ls._list_owned(), set())
796     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
797     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
798     self.ls.release()
799     self.assertEquals(self.ls._list_owned(), set())
800
801   def testNoDoubleAcquire(self):
802     self.ls.acquire('one')
803     self.assertRaises(AssertionError, self.ls.acquire, 'one')
804     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
805     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
806     self.ls.release()
807     self.ls.acquire(['one', 'three'])
808     self.ls.release('one')
809     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
810     self.ls.release('three')
811
812   def testNoWrongRelease(self):
813     self.assertRaises(AssertionError, self.ls.release)
814     self.ls.acquire('one')
815     self.assertRaises(AssertionError, self.ls.release, 'two')
816
817   def testAddRemove(self):
818     self.ls.add('four')
819     self.assertEquals(self.ls._list_owned(), set())
820     self.assert_('four' in self.ls._names())
821     self.ls.add(['five', 'six', 'seven'], acquired=1)
822     self.assert_('five' in self.ls._names())
823     self.assert_('six' in self.ls._names())
824     self.assert_('seven' in self.ls._names())
825     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
826     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
827     self.assert_('five' not in self.ls._names())
828     self.assert_('six' not in self.ls._names())
829     self.assertEquals(self.ls._list_owned(), set(['seven']))
830     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
831     self.ls.remove('seven')
832     self.assert_('seven' not in self.ls._names())
833     self.assertEquals(self.ls._list_owned(), set([]))
834     self.ls.acquire(None, shared=1)
835     self.assertRaises(AssertionError, self.ls.add, 'eight')
836     self.ls.release()
837     self.ls.acquire(None)
838     self.ls.add('eight', acquired=1)
839     self.assert_('eight' in self.ls._names())
840     self.assert_('eight' in self.ls._list_owned())
841     self.ls.add('nine')
842     self.assert_('nine' in self.ls._names())
843     self.assert_('nine' not in self.ls._list_owned())
844     self.ls.release()
845     self.ls.remove(['two'])
846     self.assert_('two' not in self.ls._names())
847     self.ls.acquire('three')
848     self.assertEquals(self.ls.remove(['three']), ['three'])
849     self.assert_('three' not in self.ls._names())
850     self.assertEquals(self.ls.remove('three'), [])
851     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
852     self.assert_('one' not in self.ls._names())
853
854   def testRemoveNonBlocking(self):
855     self.ls.acquire('one')
856     self.assertEquals(self.ls.remove('one'), ['one'])
857     self.ls.acquire(['two', 'three'])
858     self.assertEquals(self.ls.remove(['two', 'three']),
859                       ['two', 'three'])
860
861   def testNoDoubleAdd(self):
862     self.assertRaises(errors.LockError, self.ls.add, 'two')
863     self.ls.add('four')
864     self.assertRaises(errors.LockError, self.ls.add, 'four')
865
866   def testNoWrongRemoves(self):
867     self.ls.acquire(['one', 'three'], shared=1)
868     # Cannot remove 'two' while holding something which is not a superset
869     self.assertRaises(AssertionError, self.ls.remove, 'two')
870     # Cannot remove 'three' as we are sharing it
871     self.assertRaises(AssertionError, self.ls.remove, 'three')
872
873   def testAcquireSetLock(self):
874     # acquire the set-lock exclusively
875     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
876     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
877     self.assertEquals(self.ls._is_owned(), True)
878     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
879     # I can still add/remove elements...
880     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
881     self.assert_(self.ls.add('six'))
882     self.ls.release()
883     # share the set-lock
884     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
885     # adding new elements is not possible
886     self.assertRaises(AssertionError, self.ls.add, 'five')
887     self.ls.release()
888
889   def testAcquireWithRepetitions(self):
890     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
891                       set(['two', 'two', 'three']))
892     self.ls.release(['two', 'two'])
893     self.assertEquals(self.ls._list_owned(), set(['three']))
894
895   def testEmptyAcquire(self):
896     # Acquire an empty list of locks...
897     self.assertEquals(self.ls.acquire([]), set())
898     self.assertEquals(self.ls._list_owned(), set())
899     # New locks can still be addded
900     self.assert_(self.ls.add('six'))
901     # "re-acquiring" is not an issue, since we had really acquired nothing
902     self.assertEquals(self.ls.acquire([], shared=1), set())
903     self.assertEquals(self.ls._list_owned(), set())
904     # We haven't really acquired anything, so we cannot release
905     self.assertRaises(AssertionError, self.ls.release)
906
907   def _doLockSet(self, names, shared):
908     try:
909       self.ls.acquire(names, shared=shared)
910       self.done.put('DONE')
911       self.ls.release()
912     except errors.LockError:
913       self.done.put('ERR')
914
915   def _doAddSet(self, names):
916     try:
917       self.ls.add(names, acquired=1)
918       self.done.put('DONE')
919       self.ls.release()
920     except errors.LockError:
921       self.done.put('ERR')
922
923   def _doRemoveSet(self, names):
924     self.done.put(self.ls.remove(names))
925
926   @_Repeat
927   def testConcurrentSharedAcquire(self):
928     self.ls.acquire(['one', 'two'], shared=1)
929     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
930     self._waitThreads()
931     self.assertEqual(self.done.get_nowait(), 'DONE')
932     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
933     self._waitThreads()
934     self.assertEqual(self.done.get_nowait(), 'DONE')
935     self._addThread(target=self._doLockSet, args=('three', 1))
936     self._waitThreads()
937     self.assertEqual(self.done.get_nowait(), 'DONE')
938     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
939     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
940     self.assertRaises(Queue.Empty, self.done.get_nowait)
941     self.ls.release()
942     self._waitThreads()
943     self.assertEqual(self.done.get_nowait(), 'DONE')
944     self.assertEqual(self.done.get_nowait(), 'DONE')
945
946   @_Repeat
947   def testConcurrentExclusiveAcquire(self):
948     self.ls.acquire(['one', 'two'])
949     self._addThread(target=self._doLockSet, args=('three', 1))
950     self._waitThreads()
951     self.assertEqual(self.done.get_nowait(), 'DONE')
952     self._addThread(target=self._doLockSet, args=('three', 0))
953     self._waitThreads()
954     self.assertEqual(self.done.get_nowait(), 'DONE')
955     self.assertRaises(Queue.Empty, self.done.get_nowait)
956     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
957     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
958     self._addThread(target=self._doLockSet, args=('one', 0))
959     self._addThread(target=self._doLockSet, args=('one', 1))
960     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
961     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
962     self.assertRaises(Queue.Empty, self.done.get_nowait)
963     self.ls.release()
964     self._waitThreads()
965     for _ in range(6):
966       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
967
968   @_Repeat
969   def testConcurrentRemove(self):
970     self.ls.add('four')
971     self.ls.acquire(['one', 'two', 'four'])
972     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
973     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
974     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
975     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
976     self.assertRaises(Queue.Empty, self.done.get_nowait)
977     self.ls.remove('one')
978     self.ls.release()
979     self._waitThreads()
980     for i in range(4):
981       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
982     self.ls.add(['five', 'six'], acquired=1)
983     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
984     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
985     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
986     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
987     self.ls.remove('five')
988     self.ls.release()
989     self._waitThreads()
990     for i in range(4):
991       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
992     self.ls.acquire(['three', 'four'])
993     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
994     self.assertRaises(Queue.Empty, self.done.get_nowait)
995     self.ls.remove('four')
996     self._waitThreads()
997     self.assertEqual(self.done.get_nowait(), ['six'])
998     self._addThread(target=self._doRemoveSet, args=(['two']))
999     self._waitThreads()
1000     self.assertEqual(self.done.get_nowait(), ['two'])
1001     self.ls.release()
1002     # reset lockset
1003     self._setUpLS()
1004
1005   @_Repeat
1006   def testConcurrentSharedSetLock(self):
1007     # share the set-lock...
1008     self.ls.acquire(None, shared=1)
1009     # ...another thread can share it too
1010     self._addThread(target=self._doLockSet, args=(None, 1))
1011     self._waitThreads()
1012     self.assertEqual(self.done.get_nowait(), 'DONE')
1013     # ...or just share some elements
1014     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1015     self._waitThreads()
1016     self.assertEqual(self.done.get_nowait(), 'DONE')
1017     # ...but not add new ones or remove any
1018     t = self._addThread(target=self._doAddSet, args=(['nine']))
1019     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1020     self.assertRaises(Queue.Empty, self.done.get_nowait)
1021     # this just releases the set-lock
1022     self.ls.release([])
1023     t.join(60)
1024     self.assertEqual(self.done.get_nowait(), 'DONE')
1025     # release the lock on the actual elements so remove() can proceed too
1026     self.ls.release()
1027     self._waitThreads()
1028     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1029     # reset lockset
1030     self._setUpLS()
1031
1032   @_Repeat
1033   def testConcurrentExclusiveSetLock(self):
1034     # acquire the set-lock...
1035     self.ls.acquire(None, shared=0)
1036     # ...no one can do anything else
1037     self._addThread(target=self._doLockSet, args=(None, 1))
1038     self._addThread(target=self._doLockSet, args=(None, 0))
1039     self._addThread(target=self._doLockSet, args=(['three'], 0))
1040     self._addThread(target=self._doLockSet, args=(['two'], 1))
1041     self._addThread(target=self._doAddSet, args=(['nine']))
1042     self.assertRaises(Queue.Empty, self.done.get_nowait)
1043     self.ls.release()
1044     self._waitThreads()
1045     for _ in range(5):
1046       self.assertEqual(self.done.get(True, 1), 'DONE')
1047     # cleanup
1048     self._setUpLS()
1049
1050   @_Repeat
1051   def testConcurrentSetLockAdd(self):
1052     self.ls.acquire('one')
1053     # Another thread wants the whole SetLock
1054     self._addThread(target=self._doLockSet, args=(None, 0))
1055     self._addThread(target=self._doLockSet, args=(None, 1))
1056     self.assertRaises(Queue.Empty, self.done.get_nowait)
1057     self.assertRaises(AssertionError, self.ls.add, 'four')
1058     self.ls.release()
1059     self._waitThreads()
1060     self.assertEqual(self.done.get_nowait(), 'DONE')
1061     self.assertEqual(self.done.get_nowait(), 'DONE')
1062     self.ls.acquire(None)
1063     self._addThread(target=self._doLockSet, args=(None, 0))
1064     self._addThread(target=self._doLockSet, args=(None, 1))
1065     self.assertRaises(Queue.Empty, self.done.get_nowait)
1066     self.ls.add('four')
1067     self.ls.add('five', acquired=1)
1068     self.ls.add('six', acquired=1, shared=1)
1069     self.assertEquals(self.ls._list_owned(),
1070       set(['one', 'two', 'three', 'five', 'six']))
1071     self.assertEquals(self.ls._is_owned(), True)
1072     self.assertEquals(self.ls._names(),
1073       set(['one', 'two', 'three', 'four', 'five', 'six']))
1074     self.ls.release()
1075     self._waitThreads()
1076     self.assertEqual(self.done.get_nowait(), 'DONE')
1077     self.assertEqual(self.done.get_nowait(), 'DONE')
1078     self._setUpLS()
1079
1080   @_Repeat
1081   def testEmptyLockSet(self):
1082     # get the set-lock
1083     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1084     # now empty it...
1085     self.ls.remove(['one', 'two', 'three'])
1086     # and adds/locks by another thread still wait
1087     self._addThread(target=self._doAddSet, args=(['nine']))
1088     self._addThread(target=self._doLockSet, args=(None, 1))
1089     self._addThread(target=self._doLockSet, args=(None, 0))
1090     self.assertRaises(Queue.Empty, self.done.get_nowait)
1091     self.ls.release()
1092     self._waitThreads()
1093     for _ in range(3):
1094       self.assertEqual(self.done.get_nowait(), 'DONE')
1095     # empty it again...
1096     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1097     # now share it...
1098     self.assertEqual(self.ls.acquire(None, shared=1), set())
1099     # other sharers can go, adds still wait
1100     self._addThread(target=self._doLockSet, args=(None, 1))
1101     self._waitThreads()
1102     self.assertEqual(self.done.get_nowait(), 'DONE')
1103     self._addThread(target=self._doAddSet, args=(['nine']))
1104     self.assertRaises(Queue.Empty, self.done.get_nowait)
1105     self.ls.release()
1106     self._waitThreads()
1107     self.assertEqual(self.done.get_nowait(), 'DONE')
1108     self._setUpLS()
1109
1110
1111 class TestGanetiLockManager(_ThreadedTestCase):
1112
1113   def setUp(self):
1114     _ThreadedTestCase.setUp(self)
1115     self.nodes=['n1', 'n2']
1116     self.instances=['i1', 'i2', 'i3']
1117     self.GL = locking.GanetiLockManager(nodes=self.nodes,
1118                                         instances=self.instances)
1119
1120   def tearDown(self):
1121     # Don't try this at home...
1122     locking.GanetiLockManager._instance = None
1123
1124   def testLockingConstants(self):
1125     # The locking library internally cheats by assuming its constants have some
1126     # relationships with each other. Check those hold true.
1127     # This relationship is also used in the Processor to recursively acquire
1128     # the right locks. Again, please don't break it.
1129     for i in range(len(locking.LEVELS)):
1130       self.assertEqual(i, locking.LEVELS[i])
1131
1132   def testDoubleGLFails(self):
1133     self.assertRaises(AssertionError, locking.GanetiLockManager)
1134
1135   def testLockNames(self):
1136     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1137     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1138     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1139                      set(self.instances))
1140
1141   def testInitAndResources(self):
1142     locking.GanetiLockManager._instance = None
1143     self.GL = locking.GanetiLockManager()
1144     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1145     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1146     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1147
1148     locking.GanetiLockManager._instance = None
1149     self.GL = locking.GanetiLockManager(nodes=self.nodes)
1150     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1151     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1152     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1153
1154     locking.GanetiLockManager._instance = None
1155     self.GL = locking.GanetiLockManager(instances=self.instances)
1156     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1157     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1158     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1159                      set(self.instances))
1160
1161   def testAcquireRelease(self):
1162     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1163     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1164     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1165     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1166     self.GL.release(locking.LEVEL_NODE, ['n2'])
1167     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1168     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1169     self.GL.release(locking.LEVEL_NODE)
1170     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1171     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1172     self.GL.release(locking.LEVEL_INSTANCE)
1173     self.assertRaises(errors.LockError, self.GL.acquire,
1174                       locking.LEVEL_INSTANCE, ['i5'])
1175     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1176     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1177
1178   def testAcquireWholeSets(self):
1179     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1180     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1181                       set(self.instances))
1182     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1183                       set(self.instances))
1184     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1185                       set(self.nodes))
1186     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1187                       set(self.nodes))
1188     self.GL.release(locking.LEVEL_NODE)
1189     self.GL.release(locking.LEVEL_INSTANCE)
1190     self.GL.release(locking.LEVEL_CLUSTER)
1191
1192   def testAcquireWholeAndPartial(self):
1193     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1194     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1195                       set(self.instances))
1196     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1197                       set(self.instances))
1198     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1199                       set(['n2']))
1200     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1201                       set(['n2']))
1202     self.GL.release(locking.LEVEL_NODE)
1203     self.GL.release(locking.LEVEL_INSTANCE)
1204     self.GL.release(locking.LEVEL_CLUSTER)
1205
1206   def testBGLDependency(self):
1207     self.assertRaises(AssertionError, self.GL.acquire,
1208                       locking.LEVEL_NODE, ['n1', 'n2'])
1209     self.assertRaises(AssertionError, self.GL.acquire,
1210                       locking.LEVEL_INSTANCE, ['i3'])
1211     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1212     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1213     self.assertRaises(AssertionError, self.GL.release,
1214                       locking.LEVEL_CLUSTER, ['BGL'])
1215     self.assertRaises(AssertionError, self.GL.release,
1216                       locking.LEVEL_CLUSTER)
1217     self.GL.release(locking.LEVEL_NODE)
1218     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1219     self.assertRaises(AssertionError, self.GL.release,
1220                       locking.LEVEL_CLUSTER, ['BGL'])
1221     self.assertRaises(AssertionError, self.GL.release,
1222                       locking.LEVEL_CLUSTER)
1223     self.GL.release(locking.LEVEL_INSTANCE)
1224
1225   def testWrongOrder(self):
1226     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1227     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1228     self.assertRaises(AssertionError, self.GL.acquire,
1229                       locking.LEVEL_NODE, ['n1'])
1230     self.assertRaises(AssertionError, self.GL.acquire,
1231                       locking.LEVEL_INSTANCE, ['i2'])
1232
1233   # Helper function to run as a thread that shared the BGL and then acquires
1234   # some locks at another level.
1235   def _doLock(self, level, names, shared):
1236     try:
1237       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1238       self.GL.acquire(level, names, shared=shared)
1239       self.done.put('DONE')
1240       self.GL.release(level)
1241       self.GL.release(locking.LEVEL_CLUSTER)
1242     except errors.LockError:
1243       self.done.put('ERR')
1244
1245   @_Repeat
1246   def testConcurrency(self):
1247     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1248     self._addThread(target=self._doLock,
1249                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1250     self._waitThreads()
1251     self.assertEqual(self.done.get_nowait(), 'DONE')
1252     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1253     self._addThread(target=self._doLock,
1254                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1255     self._waitThreads()
1256     self.assertEqual(self.done.get_nowait(), 'DONE')
1257     self._addThread(target=self._doLock,
1258                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1259     self.assertRaises(Queue.Empty, self.done.get_nowait)
1260     self.GL.release(locking.LEVEL_INSTANCE)
1261     self._waitThreads()
1262     self.assertEqual(self.done.get_nowait(), 'DONE')
1263     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1264     self._addThread(target=self._doLock,
1265                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1266     self._waitThreads()
1267     self.assertEqual(self.done.get_nowait(), 'DONE')
1268     self._addThread(target=self._doLock,
1269                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1270     self.assertRaises(Queue.Empty, self.done.get_nowait)
1271     self.GL.release(locking.LEVEL_INSTANCE)
1272     self._waitThreads()
1273     self.assertEqual(self.done.get(True, 1), 'DONE')
1274     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1275
1276
1277 if __name__ == '__main__':
1278   unittest.main()
1279   #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1280   #unittest.TextTestRunner(verbosity=2).run(suite)