Abstract base condition test cases
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30
31 from ganeti import locking
32 from ganeti import errors
33
34
35 # This is used to test the ssynchronize decorator.
36 # Since it's passed as input to a decorator it must be declared as a global.
37 _decoratorlock = locking.SharedLock()
38
39 #: List for looping tests
40 ITERATIONS = range(8)
41
42
43 def _Repeat(fn):
44   """Decorator for executing a function many times"""
45   def wrapper(*args, **kwargs):
46     for i in ITERATIONS:
47       fn(*args, **kwargs)
48   return wrapper
49
50
51 class _ThreadedTestCase(unittest.TestCase):
52   """Test class that supports adding/waiting on threads"""
53   def setUp(self):
54     unittest.TestCase.setUp(self)
55     self.done = Queue.Queue(0)
56     self.threads = []
57
58   def _addThread(self, *args, **kwargs):
59     """Create and remember a new thread"""
60     t = threading.Thread(*args, **kwargs)
61     self.threads.append(t)
62     t.start()
63     return t
64
65   def _waitThreads(self):
66     """Wait for all our threads to finish"""
67     for t in self.threads:
68       t.join(60)
69       self.failIf(t.isAlive())
70     self.threads = []
71
72
73 class _ConditionTestCase(_ThreadedTestCase):
74   """Common test case for conditions"""
75
76   def setUp(self, cls):
77     _ThreadedTestCase.setUp(self)
78     self.lock = threading.Lock()
79     self.cond = cls(self.lock)
80
81   def _testAcquireRelease(self):
82     self.assert_(not self.cond._is_owned())
83     self.assertRaises(RuntimeError, self.cond.wait)
84     self.assertRaises(RuntimeError, self.cond.notifyAll)
85
86     self.cond.acquire()
87     self.assert_(self.cond._is_owned())
88     self.cond.notifyAll()
89     self.assert_(self.cond._is_owned())
90     self.cond.release()
91
92     self.assert_(not self.cond._is_owned())
93     self.assertRaises(RuntimeError, self.cond.wait)
94     self.assertRaises(RuntimeError, self.cond.notifyAll)
95
96   def _testNotification(self):
97     def _NotifyAll():
98       self.cond.acquire()
99       self.cond.notifyAll()
100       self.cond.release()
101
102     self.cond.acquire()
103     self._addThread(target=_NotifyAll)
104     self.cond.wait()
105     self.assert_(self.cond._is_owned())
106     self.cond.release()
107     self.assert_(not self.cond._is_owned())
108
109
110 class TestPipeCondition(_ConditionTestCase):
111   """_PipeCondition tests"""
112
113   def setUp(self):
114     _ConditionTestCase.setUp(self, locking._PipeCondition)
115
116   def testAcquireRelease(self):
117     self._testAcquireRelease()
118
119   def testNotification(self):
120     self._testNotification()
121
122   def _TestWait(self, fn):
123     self._addThread(target=fn)
124     self._addThread(target=fn)
125     self._addThread(target=fn)
126
127     # Wait for threads to be waiting
128     self.assertEqual(self.done.get(True, 1), "A")
129     self.assertEqual(self.done.get(True, 1), "A")
130     self.assertEqual(self.done.get(True, 1), "A")
131
132     self.assertRaises(Queue.Empty, self.done.get_nowait)
133
134     self.cond.acquire()
135     self.assertEqual(self.cond._nwaiters, 3)
136     # This new thread can"t acquire the lock, and thus call wait, before we
137     # release it
138     self._addThread(target=fn)
139     self.cond.notifyAll()
140     self.assertRaises(Queue.Empty, self.done.get_nowait)
141     self.cond.release()
142
143     # We should now get 3 W and 1 A (for the new thread) in whatever order
144     w = 0
145     a = 0
146     for i in range(4):
147       got = self.done.get(True, 1)
148       if got == "W":
149         w += 1
150       elif got == "A":
151         a += 1
152       else:
153         self.fail("Got %s on the done queue" % got)
154
155     self.assertEqual(w, 3)
156     self.assertEqual(a, 1)
157
158     self.cond.acquire()
159     self.cond.notifyAll()
160     self.cond.release()
161     self._waitThreads()
162     self.assertEqual(self.done.get_nowait(), "W")
163     self.assertRaises(Queue.Empty, self.done.get_nowait)
164
165   def testBlockingWait(self):
166     def _BlockingWait():
167       self.cond.acquire()
168       self.done.put("A")
169       self.cond.wait()
170       self.cond.release()
171       self.done.put("W")
172
173     self._TestWait(_BlockingWait)
174
175   def testLongTimeoutWait(self):
176     def _Helper():
177       self.cond.acquire()
178       self.done.put("A")
179       self.cond.wait(15.0)
180       self.cond.release()
181       self.done.put("W")
182
183     self._TestWait(_Helper)
184
185   def _TimeoutWait(self, timeout, check):
186     self.cond.acquire()
187     self.cond.wait(timeout)
188     self.cond.release()
189     self.done.put(check)
190
191   def testShortTimeoutWait(self):
192     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
193     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
194     self._waitThreads()
195     self.assertEqual(self.done.get_nowait(), "T1")
196     self.assertEqual(self.done.get_nowait(), "T1")
197     self.assertRaises(Queue.Empty, self.done.get_nowait)
198
199   def testZeroTimeoutWait(self):
200     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
201     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
202     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
203     self._waitThreads()
204     self.assertEqual(self.done.get_nowait(), "T0")
205     self.assertEqual(self.done.get_nowait(), "T0")
206     self.assertEqual(self.done.get_nowait(), "T0")
207     self.assertRaises(Queue.Empty, self.done.get_nowait)
208
209
210 class TestSingleActionPipeCondition(unittest.TestCase):
211   """_SingleActionPipeCondition tests"""
212
213   def setUp(self):
214     self.cond = locking._SingleActionPipeCondition()
215
216   def testInitialization(self):
217     self.assert_(self.cond._read_fd is not None)
218     self.assert_(self.cond._write_fd is not None)
219     self.assert_(self.cond._poller is not None)
220     self.assertEqual(self.cond._nwaiters, 0)
221
222   def testUsageCount(self):
223     self.cond.StartWaiting()
224     self.assert_(self.cond._read_fd is not None)
225     self.assert_(self.cond._write_fd is not None)
226     self.assert_(self.cond._poller is not None)
227     self.assertEqual(self.cond._nwaiters, 1)
228
229     # use again
230     self.cond.StartWaiting()
231     self.assertEqual(self.cond._nwaiters, 2)
232
233     # there is more than one user
234     self.assert_(not self.cond.DoneWaiting())
235     self.assert_(self.cond._read_fd is not None)
236     self.assert_(self.cond._write_fd is not None)
237     self.assert_(self.cond._poller is not None)
238     self.assertEqual(self.cond._nwaiters, 1)
239
240     self.assert_(self.cond.DoneWaiting())
241     self.assertEqual(self.cond._nwaiters, 0)
242     self.assert_(self.cond._read_fd is None)
243     self.assert_(self.cond._write_fd is None)
244     self.assert_(self.cond._poller is None)
245
246   def testNotify(self):
247     wait1 = self.cond.StartWaiting()
248     wait2 = self.cond.StartWaiting()
249
250     self.assert_(self.cond._read_fd is not None)
251     self.assert_(self.cond._write_fd is not None)
252     self.assert_(self.cond._poller is not None)
253
254     self.cond.notifyAll()
255
256     self.assert_(self.cond._read_fd is not None)
257     self.assert_(self.cond._write_fd is None)
258     self.assert_(self.cond._poller is not None)
259
260     self.assert_(not self.cond.DoneWaiting())
261
262     self.assert_(self.cond._read_fd is not None)
263     self.assert_(self.cond._write_fd is None)
264     self.assert_(self.cond._poller is not None)
265
266     self.assert_(self.cond.DoneWaiting())
267
268     self.assert_(self.cond._read_fd is None)
269     self.assert_(self.cond._write_fd is None)
270     self.assert_(self.cond._poller is None)
271
272   def testReusage(self):
273     self.cond.StartWaiting()
274     self.assert_(self.cond._read_fd is not None)
275     self.assert_(self.cond._write_fd is not None)
276     self.assert_(self.cond._poller is not None)
277
278     self.assert_(self.cond.DoneWaiting())
279
280     self.assertRaises(RuntimeError, self.cond.StartWaiting)
281     self.assert_(self.cond._read_fd is None)
282     self.assert_(self.cond._write_fd is None)
283     self.assert_(self.cond._poller is None)
284
285   def testNotifyTwice(self):
286     self.cond.notifyAll()
287     self.assertRaises(RuntimeError, self.cond.notifyAll)
288
289
290 class TestSharedLock(_ThreadedTestCase):
291   """SharedLock tests"""
292
293   def setUp(self):
294     _ThreadedTestCase.setUp(self)
295     self.sl = locking.SharedLock()
296
297   def testSequenceAndOwnership(self):
298     self.assert_(not self.sl._is_owned())
299     self.sl.acquire(shared=1)
300     self.assert_(self.sl._is_owned())
301     self.assert_(self.sl._is_owned(shared=1))
302     self.assert_(not self.sl._is_owned(shared=0))
303     self.sl.release()
304     self.assert_(not self.sl._is_owned())
305     self.sl.acquire()
306     self.assert_(self.sl._is_owned())
307     self.assert_(not self.sl._is_owned(shared=1))
308     self.assert_(self.sl._is_owned(shared=0))
309     self.sl.release()
310     self.assert_(not self.sl._is_owned())
311     self.sl.acquire(shared=1)
312     self.assert_(self.sl._is_owned())
313     self.assert_(self.sl._is_owned(shared=1))
314     self.assert_(not self.sl._is_owned(shared=0))
315     self.sl.release()
316     self.assert_(not self.sl._is_owned())
317
318   def testBooleanValue(self):
319     # semaphores are supposed to return a true value on a successful acquire
320     self.assert_(self.sl.acquire(shared=1))
321     self.sl.release()
322     self.assert_(self.sl.acquire())
323     self.sl.release()
324
325   def testDoubleLockingStoE(self):
326     self.sl.acquire(shared=1)
327     self.assertRaises(AssertionError, self.sl.acquire)
328
329   def testDoubleLockingEtoS(self):
330     self.sl.acquire()
331     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
332
333   def testDoubleLockingStoS(self):
334     self.sl.acquire(shared=1)
335     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
336
337   def testDoubleLockingEtoE(self):
338     self.sl.acquire()
339     self.assertRaises(AssertionError, self.sl.acquire)
340
341   # helper functions: called in a separate thread they acquire the lock, send
342   # their identifier on the done queue, then release it.
343   def _doItSharer(self):
344     try:
345       self.sl.acquire(shared=1)
346       self.done.put('SHR')
347       self.sl.release()
348     except errors.LockError:
349       self.done.put('ERR')
350
351   def _doItExclusive(self):
352     try:
353       self.sl.acquire()
354       self.done.put('EXC')
355       self.sl.release()
356     except errors.LockError:
357       self.done.put('ERR')
358
359   def _doItDelete(self):
360     try:
361       self.sl.delete()
362       self.done.put('DEL')
363     except errors.LockError:
364       self.done.put('ERR')
365
366   def testSharersCanCoexist(self):
367     self.sl.acquire(shared=1)
368     threading.Thread(target=self._doItSharer).start()
369     self.assert_(self.done.get(True, 1))
370     self.sl.release()
371
372   @_Repeat
373   def testExclusiveBlocksExclusive(self):
374     self.sl.acquire()
375     self._addThread(target=self._doItExclusive)
376     self.assertRaises(Queue.Empty, self.done.get_nowait)
377     self.sl.release()
378     self._waitThreads()
379     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
380
381   @_Repeat
382   def testExclusiveBlocksDelete(self):
383     self.sl.acquire()
384     self._addThread(target=self._doItDelete)
385     self.assertRaises(Queue.Empty, self.done.get_nowait)
386     self.sl.release()
387     self._waitThreads()
388     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
389     self.sl = locking.SharedLock()
390
391   @_Repeat
392   def testExclusiveBlocksSharer(self):
393     self.sl.acquire()
394     self._addThread(target=self._doItSharer)
395     self.assertRaises(Queue.Empty, self.done.get_nowait)
396     self.sl.release()
397     self._waitThreads()
398     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
399
400   @_Repeat
401   def testSharerBlocksExclusive(self):
402     self.sl.acquire(shared=1)
403     self._addThread(target=self._doItExclusive)
404     self.assertRaises(Queue.Empty, self.done.get_nowait)
405     self.sl.release()
406     self._waitThreads()
407     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
408
409   @_Repeat
410   def testSharerBlocksDelete(self):
411     self.sl.acquire(shared=1)
412     self._addThread(target=self._doItDelete)
413     self.assertRaises(Queue.Empty, self.done.get_nowait)
414     self.sl.release()
415     self._waitThreads()
416     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
417     self.sl = locking.SharedLock()
418
419   @_Repeat
420   def testWaitingExclusiveBlocksSharer(self):
421     """SKIPPED testWaitingExclusiveBlockSharer"""
422     return
423
424     self.sl.acquire(shared=1)
425     # the lock is acquired in shared mode...
426     self._addThread(target=self._doItExclusive)
427     # ...but now an exclusive is waiting...
428     self._addThread(target=self._doItSharer)
429     # ...so the sharer should be blocked as well
430     self.assertRaises(Queue.Empty, self.done.get_nowait)
431     self.sl.release()
432     self._waitThreads()
433     # The exclusive passed before
434     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
435     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
436
437   @_Repeat
438   def testWaitingSharerBlocksExclusive(self):
439     """SKIPPED testWaitingSharerBlocksExclusive"""
440     return
441
442     self.sl.acquire()
443     # the lock is acquired in exclusive mode...
444     self._addThread(target=self._doItSharer)
445     # ...but now a sharer is waiting...
446     self._addThread(target=self._doItExclusive)
447     # ...the exclusive is waiting too...
448     self.assertRaises(Queue.Empty, self.done.get_nowait)
449     self.sl.release()
450     self._waitThreads()
451     # The sharer passed before
452     self.assertEqual(self.done.get_nowait(), 'SHR')
453     self.assertEqual(self.done.get_nowait(), 'EXC')
454
455   def testDelete(self):
456     self.sl.delete()
457     self.assertRaises(errors.LockError, self.sl.acquire)
458     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
459     self.assertRaises(errors.LockError, self.sl.delete)
460
461   def testDeleteTimeout(self):
462     self.sl.delete(timeout=60)
463
464   def testNoDeleteIfSharer(self):
465     self.sl.acquire(shared=1)
466     self.assertRaises(AssertionError, self.sl.delete)
467
468   @_Repeat
469   def testDeletePendingSharersExclusiveDelete(self):
470     self.sl.acquire()
471     self._addThread(target=self._doItSharer)
472     self._addThread(target=self._doItSharer)
473     self._addThread(target=self._doItExclusive)
474     self._addThread(target=self._doItDelete)
475     self.sl.delete()
476     self._waitThreads()
477     # The threads who were pending return ERR
478     for _ in range(4):
479       self.assertEqual(self.done.get_nowait(), 'ERR')
480     self.sl = locking.SharedLock()
481
482   @_Repeat
483   def testDeletePendingDeleteExclusiveSharers(self):
484     self.sl.acquire()
485     self._addThread(target=self._doItDelete)
486     self._addThread(target=self._doItExclusive)
487     self._addThread(target=self._doItSharer)
488     self._addThread(target=self._doItSharer)
489     self.sl.delete()
490     self._waitThreads()
491     # The two threads who were pending return both ERR
492     self.assertEqual(self.done.get_nowait(), 'ERR')
493     self.assertEqual(self.done.get_nowait(), 'ERR')
494     self.assertEqual(self.done.get_nowait(), 'ERR')
495     self.assertEqual(self.done.get_nowait(), 'ERR')
496     self.sl = locking.SharedLock()
497
498   @_Repeat
499   def testExclusiveAcquireTimeout(self):
500     def _LockExclusive(wait):
501       self.sl.acquire(shared=0)
502       self.done.put("A: start sleep")
503       time.sleep(wait)
504       self.done.put("A: end sleep")
505       self.sl.release()
506
507     for shared in [0, 1]:
508       # Start thread to hold lock for 20 ms
509       self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
510
511       # Wait for sleep to begin
512       self.assertEqual(self.done.get(), "A: start sleep")
513
514       # Wait up to 100 ms to get lock
515       self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
516       self.done.put("got 2nd")
517       self.sl.release()
518
519       self._waitThreads()
520
521       self.assertEqual(self.done.get_nowait(), "A: end sleep")
522       self.assertEqual(self.done.get_nowait(), "got 2nd")
523       self.assertRaises(Queue.Empty, self.done.get_nowait)
524
525   @_Repeat
526   def testAcquireExpiringTimeout(self):
527     def _AcquireWithTimeout(shared, timeout):
528       if not self.sl.acquire(shared=shared, timeout=timeout):
529         self.done.put("timeout")
530
531     for shared in [0, 1]:
532       # Lock exclusively
533       self.sl.acquire()
534
535       # Start shared acquires with timeout between 0 and 20 ms
536       for i in xrange(11):
537         self._addThread(target=_AcquireWithTimeout,
538                         args=(shared, i * 2.0 / 1000.0))
539
540       # Wait for threads to finish (makes sure the acquire timeout expires
541       # before releasing the lock)
542       self._waitThreads()
543
544       # Release lock
545       self.sl.release()
546
547       for _ in xrange(11):
548         self.assertEqual(self.done.get_nowait(), "timeout")
549
550       self.assertRaises(Queue.Empty, self.done.get_nowait)
551
552   @_Repeat
553   def testSharedSkipExclusiveAcquires(self):
554     # Tests whether shared acquires jump in front of exclusive acquires in the
555     # queue.
556
557     # Get exclusive lock while we fill the queue
558     self.sl.acquire()
559
560     def _Acquire(shared, name):
561       if not self.sl.acquire(shared=shared):
562         return
563
564       self.done.put(name)
565       self.sl.release()
566
567     # Start shared acquires
568     for _ in xrange(5):
569       self._addThread(target=_Acquire, args=(1, "shared A"))
570
571     # Start exclusive acquires
572     for _ in xrange(3):
573       self._addThread(target=_Acquire, args=(0, "exclusive B"))
574
575     # More shared acquires
576     for _ in xrange(5):
577       self._addThread(target=_Acquire, args=(1, "shared C"))
578
579     # More exclusive acquires
580     for _ in xrange(3):
581       self._addThread(target=_Acquire, args=(0, "exclusive D"))
582
583     # Expect 6 pending exclusive acquires and 1 for all shared acquires
584     # together. There's no way to wait for SharedLock.acquire to start
585     # its work. Hence the timeout of 2 seconds.
586     pending = 0
587     end_time = time.time() + 2.0
588     while time.time() < end_time:
589       pending = self.sl._count_pending()
590       self.assert_(pending >= 0 and pending <= 7)
591       if pending == 7:
592         break
593       time.sleep(0.05)
594     self.assertEqual(pending, 7)
595
596     # Release exclusive lock and wait
597     self.sl.release()
598
599     self._waitThreads()
600
601     # Check sequence
602     shr_a = 0
603     shr_c = 0
604     for _ in xrange(10):
605       # Shared locks aren't guaranteed to be notified in order, but they'll be
606       # first
607       tmp = self.done.get_nowait()
608       if tmp == "shared A":
609         shr_a += 1
610       elif tmp == "shared C":
611         shr_c += 1
612     self.assertEqual(shr_a, 5)
613     self.assertEqual(shr_c, 5)
614
615     for _ in xrange(3):
616       self.assertEqual(self.done.get_nowait(), "exclusive B")
617
618     for _ in xrange(3):
619       self.assertEqual(self.done.get_nowait(), "exclusive D")
620
621     self.assertRaises(Queue.Empty, self.done.get_nowait)
622
623   @_Repeat
624   def testMixedAcquireTimeout(self):
625     sync = threading.Condition()
626
627     def _AcquireShared(ev):
628       if not self.sl.acquire(shared=1, timeout=None):
629         return
630
631       self.done.put("shared")
632
633       # Notify main thread
634       ev.set()
635
636       # Wait for notification
637       sync.acquire()
638       try:
639         sync.wait()
640       finally:
641         sync.release()
642
643       # Release lock
644       self.sl.release()
645
646     acquires = []
647     for _ in xrange(3):
648       ev = threading.Event()
649       self._addThread(target=_AcquireShared, args=(ev, ))
650       acquires.append(ev)
651
652     # Wait for all acquires to finish
653     for i in acquires:
654       i.wait()
655
656     self.assertEqual(self.sl._count_pending(), 0)
657
658     # Try to get exclusive lock
659     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
660
661     # Acquire exclusive without timeout
662     exclsync = threading.Condition()
663     exclev = threading.Event()
664
665     def _AcquireExclusive():
666       if not self.sl.acquire(shared=0):
667         return
668
669       self.done.put("exclusive")
670
671       # Notify main thread
672       exclev.set()
673
674       exclsync.acquire()
675       try:
676         exclsync.wait()
677       finally:
678         exclsync.release()
679
680       self.sl.release()
681
682     self._addThread(target=_AcquireExclusive)
683
684     # Try to get exclusive lock
685     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
686
687     # Make all shared holders release their locks
688     sync.acquire()
689     try:
690       sync.notifyAll()
691     finally:
692       sync.release()
693
694     # Wait for exclusive acquire to succeed
695     exclev.wait()
696
697     self.assertEqual(self.sl._count_pending(), 0)
698
699     # Try to get exclusive lock
700     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
701
702     def _AcquireSharedSimple():
703       if self.sl.acquire(shared=1, timeout=None):
704         self.done.put("shared2")
705         self.sl.release()
706
707     for _ in xrange(10):
708       self._addThread(target=_AcquireSharedSimple)
709
710     # Tell exclusive lock to release
711     exclsync.acquire()
712     try:
713       exclsync.notifyAll()
714     finally:
715       exclsync.release()
716
717     # Wait for everything to finish
718     self._waitThreads()
719
720     self.assertEqual(self.sl._count_pending(), 0)
721
722     # Check sequence
723     for _ in xrange(3):
724       self.assertEqual(self.done.get_nowait(), "shared")
725
726     self.assertEqual(self.done.get_nowait(), "exclusive")
727
728     for _ in xrange(10):
729       self.assertEqual(self.done.get_nowait(), "shared2")
730
731     self.assertRaises(Queue.Empty, self.done.get_nowait)
732
733
734 class TestSSynchronizedDecorator(_ThreadedTestCase):
735   """Shared Lock Synchronized decorator test"""
736
737   def setUp(self):
738     _ThreadedTestCase.setUp(self)
739
740   @locking.ssynchronized(_decoratorlock)
741   def _doItExclusive(self):
742     self.assert_(_decoratorlock._is_owned())
743     self.done.put('EXC')
744
745   @locking.ssynchronized(_decoratorlock, shared=1)
746   def _doItSharer(self):
747     self.assert_(_decoratorlock._is_owned(shared=1))
748     self.done.put('SHR')
749
750   def testDecoratedFunctions(self):
751     self._doItExclusive()
752     self.assert_(not _decoratorlock._is_owned())
753     self._doItSharer()
754     self.assert_(not _decoratorlock._is_owned())
755
756   def testSharersCanCoexist(self):
757     _decoratorlock.acquire(shared=1)
758     threading.Thread(target=self._doItSharer).start()
759     self.assert_(self.done.get(True, 1))
760     _decoratorlock.release()
761
762   @_Repeat
763   def testExclusiveBlocksExclusive(self):
764     _decoratorlock.acquire()
765     self._addThread(target=self._doItExclusive)
766     # give it a bit of time to check that it's not actually doing anything
767     self.assertRaises(Queue.Empty, self.done.get_nowait)
768     _decoratorlock.release()
769     self._waitThreads()
770     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
771
772   @_Repeat
773   def testExclusiveBlocksSharer(self):
774     _decoratorlock.acquire()
775     self._addThread(target=self._doItSharer)
776     self.assertRaises(Queue.Empty, self.done.get_nowait)
777     _decoratorlock.release()
778     self._waitThreads()
779     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
780
781   @_Repeat
782   def testSharerBlocksExclusive(self):
783     _decoratorlock.acquire(shared=1)
784     self._addThread(target=self._doItExclusive)
785     self.assertRaises(Queue.Empty, self.done.get_nowait)
786     _decoratorlock.release()
787     self._waitThreads()
788     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
789
790
791 class TestLockSet(_ThreadedTestCase):
792   """LockSet tests"""
793
794   def setUp(self):
795     _ThreadedTestCase.setUp(self)
796     self._setUpLS()
797
798   def _setUpLS(self):
799     """Helper to (re)initialize the lock set"""
800     self.resources = ['one', 'two', 'three']
801     self.ls = locking.LockSet(members=self.resources)
802
803   def testResources(self):
804     self.assertEquals(self.ls._names(), set(self.resources))
805     newls = locking.LockSet()
806     self.assertEquals(newls._names(), set())
807
808   def testAcquireRelease(self):
809     self.assert_(self.ls.acquire('one'))
810     self.assertEquals(self.ls._list_owned(), set(['one']))
811     self.ls.release()
812     self.assertEquals(self.ls._list_owned(), set())
813     self.assertEquals(self.ls.acquire(['one']), set(['one']))
814     self.assertEquals(self.ls._list_owned(), set(['one']))
815     self.ls.release()
816     self.assertEquals(self.ls._list_owned(), set())
817     self.ls.acquire(['one', 'two', 'three'])
818     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
819     self.ls.release('one')
820     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
821     self.ls.release(['three'])
822     self.assertEquals(self.ls._list_owned(), set(['two']))
823     self.ls.release()
824     self.assertEquals(self.ls._list_owned(), set())
825     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
826     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
827     self.ls.release()
828     self.assertEquals(self.ls._list_owned(), set())
829
830   def testNoDoubleAcquire(self):
831     self.ls.acquire('one')
832     self.assertRaises(AssertionError, self.ls.acquire, 'one')
833     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
834     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
835     self.ls.release()
836     self.ls.acquire(['one', 'three'])
837     self.ls.release('one')
838     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
839     self.ls.release('three')
840
841   def testNoWrongRelease(self):
842     self.assertRaises(AssertionError, self.ls.release)
843     self.ls.acquire('one')
844     self.assertRaises(AssertionError, self.ls.release, 'two')
845
846   def testAddRemove(self):
847     self.ls.add('four')
848     self.assertEquals(self.ls._list_owned(), set())
849     self.assert_('four' in self.ls._names())
850     self.ls.add(['five', 'six', 'seven'], acquired=1)
851     self.assert_('five' in self.ls._names())
852     self.assert_('six' in self.ls._names())
853     self.assert_('seven' in self.ls._names())
854     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
855     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
856     self.assert_('five' not in self.ls._names())
857     self.assert_('six' not in self.ls._names())
858     self.assertEquals(self.ls._list_owned(), set(['seven']))
859     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
860     self.ls.remove('seven')
861     self.assert_('seven' not in self.ls._names())
862     self.assertEquals(self.ls._list_owned(), set([]))
863     self.ls.acquire(None, shared=1)
864     self.assertRaises(AssertionError, self.ls.add, 'eight')
865     self.ls.release()
866     self.ls.acquire(None)
867     self.ls.add('eight', acquired=1)
868     self.assert_('eight' in self.ls._names())
869     self.assert_('eight' in self.ls._list_owned())
870     self.ls.add('nine')
871     self.assert_('nine' in self.ls._names())
872     self.assert_('nine' not in self.ls._list_owned())
873     self.ls.release()
874     self.ls.remove(['two'])
875     self.assert_('two' not in self.ls._names())
876     self.ls.acquire('three')
877     self.assertEquals(self.ls.remove(['three']), ['three'])
878     self.assert_('three' not in self.ls._names())
879     self.assertEquals(self.ls.remove('three'), [])
880     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
881     self.assert_('one' not in self.ls._names())
882
883   def testRemoveNonBlocking(self):
884     self.ls.acquire('one')
885     self.assertEquals(self.ls.remove('one'), ['one'])
886     self.ls.acquire(['two', 'three'])
887     self.assertEquals(self.ls.remove(['two', 'three']),
888                       ['two', 'three'])
889
890   def testNoDoubleAdd(self):
891     self.assertRaises(errors.LockError, self.ls.add, 'two')
892     self.ls.add('four')
893     self.assertRaises(errors.LockError, self.ls.add, 'four')
894
895   def testNoWrongRemoves(self):
896     self.ls.acquire(['one', 'three'], shared=1)
897     # Cannot remove 'two' while holding something which is not a superset
898     self.assertRaises(AssertionError, self.ls.remove, 'two')
899     # Cannot remove 'three' as we are sharing it
900     self.assertRaises(AssertionError, self.ls.remove, 'three')
901
902   def testAcquireSetLock(self):
903     # acquire the set-lock exclusively
904     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
905     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
906     self.assertEquals(self.ls._is_owned(), True)
907     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
908     # I can still add/remove elements...
909     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
910     self.assert_(self.ls.add('six'))
911     self.ls.release()
912     # share the set-lock
913     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
914     # adding new elements is not possible
915     self.assertRaises(AssertionError, self.ls.add, 'five')
916     self.ls.release()
917
918   def testAcquireWithRepetitions(self):
919     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
920                       set(['two', 'two', 'three']))
921     self.ls.release(['two', 'two'])
922     self.assertEquals(self.ls._list_owned(), set(['three']))
923
924   def testEmptyAcquire(self):
925     # Acquire an empty list of locks...
926     self.assertEquals(self.ls.acquire([]), set())
927     self.assertEquals(self.ls._list_owned(), set())
928     # New locks can still be addded
929     self.assert_(self.ls.add('six'))
930     # "re-acquiring" is not an issue, since we had really acquired nothing
931     self.assertEquals(self.ls.acquire([], shared=1), set())
932     self.assertEquals(self.ls._list_owned(), set())
933     # We haven't really acquired anything, so we cannot release
934     self.assertRaises(AssertionError, self.ls.release)
935
936   def _doLockSet(self, names, shared):
937     try:
938       self.ls.acquire(names, shared=shared)
939       self.done.put('DONE')
940       self.ls.release()
941     except errors.LockError:
942       self.done.put('ERR')
943
944   def _doAddSet(self, names):
945     try:
946       self.ls.add(names, acquired=1)
947       self.done.put('DONE')
948       self.ls.release()
949     except errors.LockError:
950       self.done.put('ERR')
951
952   def _doRemoveSet(self, names):
953     self.done.put(self.ls.remove(names))
954
955   @_Repeat
956   def testConcurrentSharedAcquire(self):
957     self.ls.acquire(['one', 'two'], shared=1)
958     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
959     self._waitThreads()
960     self.assertEqual(self.done.get_nowait(), 'DONE')
961     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
962     self._waitThreads()
963     self.assertEqual(self.done.get_nowait(), 'DONE')
964     self._addThread(target=self._doLockSet, args=('three', 1))
965     self._waitThreads()
966     self.assertEqual(self.done.get_nowait(), 'DONE')
967     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
968     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
969     self.assertRaises(Queue.Empty, self.done.get_nowait)
970     self.ls.release()
971     self._waitThreads()
972     self.assertEqual(self.done.get_nowait(), 'DONE')
973     self.assertEqual(self.done.get_nowait(), 'DONE')
974
975   @_Repeat
976   def testConcurrentExclusiveAcquire(self):
977     self.ls.acquire(['one', 'two'])
978     self._addThread(target=self._doLockSet, args=('three', 1))
979     self._waitThreads()
980     self.assertEqual(self.done.get_nowait(), 'DONE')
981     self._addThread(target=self._doLockSet, args=('three', 0))
982     self._waitThreads()
983     self.assertEqual(self.done.get_nowait(), 'DONE')
984     self.assertRaises(Queue.Empty, self.done.get_nowait)
985     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
986     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
987     self._addThread(target=self._doLockSet, args=('one', 0))
988     self._addThread(target=self._doLockSet, args=('one', 1))
989     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
990     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
991     self.assertRaises(Queue.Empty, self.done.get_nowait)
992     self.ls.release()
993     self._waitThreads()
994     for _ in range(6):
995       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
996
997   @_Repeat
998   def testConcurrentRemove(self):
999     self.ls.add('four')
1000     self.ls.acquire(['one', 'two', 'four'])
1001     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1002     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1003     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1004     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1005     self.assertRaises(Queue.Empty, self.done.get_nowait)
1006     self.ls.remove('one')
1007     self.ls.release()
1008     self._waitThreads()
1009     for i in range(4):
1010       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1011     self.ls.add(['five', 'six'], acquired=1)
1012     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1013     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1014     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1015     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1016     self.ls.remove('five')
1017     self.ls.release()
1018     self._waitThreads()
1019     for i in range(4):
1020       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1021     self.ls.acquire(['three', 'four'])
1022     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1023     self.assertRaises(Queue.Empty, self.done.get_nowait)
1024     self.ls.remove('four')
1025     self._waitThreads()
1026     self.assertEqual(self.done.get_nowait(), ['six'])
1027     self._addThread(target=self._doRemoveSet, args=(['two']))
1028     self._waitThreads()
1029     self.assertEqual(self.done.get_nowait(), ['two'])
1030     self.ls.release()
1031     # reset lockset
1032     self._setUpLS()
1033
1034   @_Repeat
1035   def testConcurrentSharedSetLock(self):
1036     # share the set-lock...
1037     self.ls.acquire(None, shared=1)
1038     # ...another thread can share it too
1039     self._addThread(target=self._doLockSet, args=(None, 1))
1040     self._waitThreads()
1041     self.assertEqual(self.done.get_nowait(), 'DONE')
1042     # ...or just share some elements
1043     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1044     self._waitThreads()
1045     self.assertEqual(self.done.get_nowait(), 'DONE')
1046     # ...but not add new ones or remove any
1047     t = self._addThread(target=self._doAddSet, args=(['nine']))
1048     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1049     self.assertRaises(Queue.Empty, self.done.get_nowait)
1050     # this just releases the set-lock
1051     self.ls.release([])
1052     t.join(60)
1053     self.assertEqual(self.done.get_nowait(), 'DONE')
1054     # release the lock on the actual elements so remove() can proceed too
1055     self.ls.release()
1056     self._waitThreads()
1057     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1058     # reset lockset
1059     self._setUpLS()
1060
1061   @_Repeat
1062   def testConcurrentExclusiveSetLock(self):
1063     # acquire the set-lock...
1064     self.ls.acquire(None, shared=0)
1065     # ...no one can do anything else
1066     self._addThread(target=self._doLockSet, args=(None, 1))
1067     self._addThread(target=self._doLockSet, args=(None, 0))
1068     self._addThread(target=self._doLockSet, args=(['three'], 0))
1069     self._addThread(target=self._doLockSet, args=(['two'], 1))
1070     self._addThread(target=self._doAddSet, args=(['nine']))
1071     self.assertRaises(Queue.Empty, self.done.get_nowait)
1072     self.ls.release()
1073     self._waitThreads()
1074     for _ in range(5):
1075       self.assertEqual(self.done.get(True, 1), 'DONE')
1076     # cleanup
1077     self._setUpLS()
1078
1079   @_Repeat
1080   def testConcurrentSetLockAdd(self):
1081     self.ls.acquire('one')
1082     # Another thread wants the whole SetLock
1083     self._addThread(target=self._doLockSet, args=(None, 0))
1084     self._addThread(target=self._doLockSet, args=(None, 1))
1085     self.assertRaises(Queue.Empty, self.done.get_nowait)
1086     self.assertRaises(AssertionError, self.ls.add, 'four')
1087     self.ls.release()
1088     self._waitThreads()
1089     self.assertEqual(self.done.get_nowait(), 'DONE')
1090     self.assertEqual(self.done.get_nowait(), 'DONE')
1091     self.ls.acquire(None)
1092     self._addThread(target=self._doLockSet, args=(None, 0))
1093     self._addThread(target=self._doLockSet, args=(None, 1))
1094     self.assertRaises(Queue.Empty, self.done.get_nowait)
1095     self.ls.add('four')
1096     self.ls.add('five', acquired=1)
1097     self.ls.add('six', acquired=1, shared=1)
1098     self.assertEquals(self.ls._list_owned(),
1099       set(['one', 'two', 'three', 'five', 'six']))
1100     self.assertEquals(self.ls._is_owned(), True)
1101     self.assertEquals(self.ls._names(),
1102       set(['one', 'two', 'three', 'four', 'five', 'six']))
1103     self.ls.release()
1104     self._waitThreads()
1105     self.assertEqual(self.done.get_nowait(), 'DONE')
1106     self.assertEqual(self.done.get_nowait(), 'DONE')
1107     self._setUpLS()
1108
1109   @_Repeat
1110   def testEmptyLockSet(self):
1111     # get the set-lock
1112     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1113     # now empty it...
1114     self.ls.remove(['one', 'two', 'three'])
1115     # and adds/locks by another thread still wait
1116     self._addThread(target=self._doAddSet, args=(['nine']))
1117     self._addThread(target=self._doLockSet, args=(None, 1))
1118     self._addThread(target=self._doLockSet, args=(None, 0))
1119     self.assertRaises(Queue.Empty, self.done.get_nowait)
1120     self.ls.release()
1121     self._waitThreads()
1122     for _ in range(3):
1123       self.assertEqual(self.done.get_nowait(), 'DONE')
1124     # empty it again...
1125     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1126     # now share it...
1127     self.assertEqual(self.ls.acquire(None, shared=1), set())
1128     # other sharers can go, adds still wait
1129     self._addThread(target=self._doLockSet, args=(None, 1))
1130     self._waitThreads()
1131     self.assertEqual(self.done.get_nowait(), 'DONE')
1132     self._addThread(target=self._doAddSet, args=(['nine']))
1133     self.assertRaises(Queue.Empty, self.done.get_nowait)
1134     self.ls.release()
1135     self._waitThreads()
1136     self.assertEqual(self.done.get_nowait(), 'DONE')
1137     self._setUpLS()
1138
1139
1140 class TestGanetiLockManager(_ThreadedTestCase):
1141
1142   def setUp(self):
1143     _ThreadedTestCase.setUp(self)
1144     self.nodes=['n1', 'n2']
1145     self.instances=['i1', 'i2', 'i3']
1146     self.GL = locking.GanetiLockManager(nodes=self.nodes,
1147                                         instances=self.instances)
1148
1149   def tearDown(self):
1150     # Don't try this at home...
1151     locking.GanetiLockManager._instance = None
1152
1153   def testLockingConstants(self):
1154     # The locking library internally cheats by assuming its constants have some
1155     # relationships with each other. Check those hold true.
1156     # This relationship is also used in the Processor to recursively acquire
1157     # the right locks. Again, please don't break it.
1158     for i in range(len(locking.LEVELS)):
1159       self.assertEqual(i, locking.LEVELS[i])
1160
1161   def testDoubleGLFails(self):
1162     self.assertRaises(AssertionError, locking.GanetiLockManager)
1163
1164   def testLockNames(self):
1165     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1166     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1167     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1168                      set(self.instances))
1169
1170   def testInitAndResources(self):
1171     locking.GanetiLockManager._instance = None
1172     self.GL = locking.GanetiLockManager()
1173     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1174     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1175     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1176
1177     locking.GanetiLockManager._instance = None
1178     self.GL = locking.GanetiLockManager(nodes=self.nodes)
1179     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1180     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1181     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1182
1183     locking.GanetiLockManager._instance = None
1184     self.GL = locking.GanetiLockManager(instances=self.instances)
1185     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1186     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1187     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1188                      set(self.instances))
1189
1190   def testAcquireRelease(self):
1191     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1192     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1193     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1194     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1195     self.GL.release(locking.LEVEL_NODE, ['n2'])
1196     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1197     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1198     self.GL.release(locking.LEVEL_NODE)
1199     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1200     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1201     self.GL.release(locking.LEVEL_INSTANCE)
1202     self.assertRaises(errors.LockError, self.GL.acquire,
1203                       locking.LEVEL_INSTANCE, ['i5'])
1204     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1205     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1206
1207   def testAcquireWholeSets(self):
1208     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1209     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1210                       set(self.instances))
1211     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1212                       set(self.instances))
1213     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1214                       set(self.nodes))
1215     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1216                       set(self.nodes))
1217     self.GL.release(locking.LEVEL_NODE)
1218     self.GL.release(locking.LEVEL_INSTANCE)
1219     self.GL.release(locking.LEVEL_CLUSTER)
1220
1221   def testAcquireWholeAndPartial(self):
1222     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1223     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1224                       set(self.instances))
1225     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1226                       set(self.instances))
1227     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1228                       set(['n2']))
1229     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1230                       set(['n2']))
1231     self.GL.release(locking.LEVEL_NODE)
1232     self.GL.release(locking.LEVEL_INSTANCE)
1233     self.GL.release(locking.LEVEL_CLUSTER)
1234
1235   def testBGLDependency(self):
1236     self.assertRaises(AssertionError, self.GL.acquire,
1237                       locking.LEVEL_NODE, ['n1', 'n2'])
1238     self.assertRaises(AssertionError, self.GL.acquire,
1239                       locking.LEVEL_INSTANCE, ['i3'])
1240     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1241     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1242     self.assertRaises(AssertionError, self.GL.release,
1243                       locking.LEVEL_CLUSTER, ['BGL'])
1244     self.assertRaises(AssertionError, self.GL.release,
1245                       locking.LEVEL_CLUSTER)
1246     self.GL.release(locking.LEVEL_NODE)
1247     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1248     self.assertRaises(AssertionError, self.GL.release,
1249                       locking.LEVEL_CLUSTER, ['BGL'])
1250     self.assertRaises(AssertionError, self.GL.release,
1251                       locking.LEVEL_CLUSTER)
1252     self.GL.release(locking.LEVEL_INSTANCE)
1253
1254   def testWrongOrder(self):
1255     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1256     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1257     self.assertRaises(AssertionError, self.GL.acquire,
1258                       locking.LEVEL_NODE, ['n1'])
1259     self.assertRaises(AssertionError, self.GL.acquire,
1260                       locking.LEVEL_INSTANCE, ['i2'])
1261
1262   # Helper function to run as a thread that shared the BGL and then acquires
1263   # some locks at another level.
1264   def _doLock(self, level, names, shared):
1265     try:
1266       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1267       self.GL.acquire(level, names, shared=shared)
1268       self.done.put('DONE')
1269       self.GL.release(level)
1270       self.GL.release(locking.LEVEL_CLUSTER)
1271     except errors.LockError:
1272       self.done.put('ERR')
1273
1274   @_Repeat
1275   def testConcurrency(self):
1276     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1277     self._addThread(target=self._doLock,
1278                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1279     self._waitThreads()
1280     self.assertEqual(self.done.get_nowait(), 'DONE')
1281     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1282     self._addThread(target=self._doLock,
1283                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1284     self._waitThreads()
1285     self.assertEqual(self.done.get_nowait(), 'DONE')
1286     self._addThread(target=self._doLock,
1287                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1288     self.assertRaises(Queue.Empty, self.done.get_nowait)
1289     self.GL.release(locking.LEVEL_INSTANCE)
1290     self._waitThreads()
1291     self.assertEqual(self.done.get_nowait(), 'DONE')
1292     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1293     self._addThread(target=self._doLock,
1294                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1295     self._waitThreads()
1296     self.assertEqual(self.done.get_nowait(), 'DONE')
1297     self._addThread(target=self._doLock,
1298                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1299     self.assertRaises(Queue.Empty, self.done.get_nowait)
1300     self.GL.release(locking.LEVEL_INSTANCE)
1301     self._waitThreads()
1302     self.assertEqual(self.done.get(True, 1), 'DONE')
1303     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1304
1305
1306 if __name__ == '__main__':
1307   unittest.main()
1308   #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1309   #unittest.TextTestRunner(verbosity=2).run(suite)