luxi: Pass socket path directly to exception, not in tuple
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30
31 from ganeti import locking
32 from ganeti import errors
33
34
35 # This is used to test the ssynchronize decorator.
36 # Since it's passed as input to a decorator it must be declared as a global.
37 _decoratorlock = locking.SharedLock()
38
39 #: List for looping tests
40 ITERATIONS = range(8)
41
42
43 def _Repeat(fn):
44   """Decorator for executing a function many times"""
45   def wrapper(*args, **kwargs):
46     for i in ITERATIONS:
47       fn(*args, **kwargs)
48   return wrapper
49
50
51 def SafeSleep(duration):
52   start = time.time()
53   while True:
54     delay = start + duration - time.time()
55     if delay <= 0.0:
56       break
57     time.sleep(delay)
58
59
60 class _ThreadedTestCase(unittest.TestCase):
61   """Test class that supports adding/waiting on threads"""
62   def setUp(self):
63     unittest.TestCase.setUp(self)
64     self.done = Queue.Queue(0)
65     self.threads = []
66
67   def _addThread(self, *args, **kwargs):
68     """Create and remember a new thread"""
69     t = threading.Thread(*args, **kwargs)
70     self.threads.append(t)
71     t.start()
72     return t
73
74   def _waitThreads(self):
75     """Wait for all our threads to finish"""
76     for t in self.threads:
77       t.join(60)
78       self.failIf(t.isAlive())
79     self.threads = []
80
81
82 class _ConditionTestCase(_ThreadedTestCase):
83   """Common test case for conditions"""
84
85   def setUp(self, cls):
86     _ThreadedTestCase.setUp(self)
87     self.lock = threading.Lock()
88     self.cond = cls(self.lock)
89
90   def _testAcquireRelease(self):
91     self.assert_(not self.cond._is_owned())
92     self.assertRaises(RuntimeError, self.cond.wait)
93     self.assertRaises(RuntimeError, self.cond.notifyAll)
94
95     self.cond.acquire()
96     self.assert_(self.cond._is_owned())
97     self.cond.notifyAll()
98     self.assert_(self.cond._is_owned())
99     self.cond.release()
100
101     self.assert_(not self.cond._is_owned())
102     self.assertRaises(RuntimeError, self.cond.wait)
103     self.assertRaises(RuntimeError, self.cond.notifyAll)
104
105   def _testNotification(self):
106     def _NotifyAll():
107       self.done.put("NE")
108       self.cond.acquire()
109       self.done.put("NA")
110       self.cond.notifyAll()
111       self.done.put("NN")
112       self.cond.release()
113
114     self.cond.acquire()
115     self._addThread(target=_NotifyAll)
116     self.assertEqual(self.done.get(True, 1), "NE")
117     self.assertRaises(Queue.Empty, self.done.get_nowait)
118     self.cond.wait()
119     self.assertEqual(self.done.get(True, 1), "NA")
120     self.assertEqual(self.done.get(True, 1), "NN")
121     self.assert_(self.cond._is_owned())
122     self.cond.release()
123     self.assert_(not self.cond._is_owned())
124
125
126 class TestSingleNotifyPipeCondition(_ConditionTestCase):
127   """SingleNotifyPipeCondition tests"""
128
129   def setUp(self):
130     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
131
132   def testAcquireRelease(self):
133     self._testAcquireRelease()
134
135   def testNotification(self):
136     self._testNotification()
137
138   def testWaitReuse(self):
139     self.cond.acquire()
140     self.cond.wait(0)
141     self.cond.wait(0.1)
142     self.cond.release()
143
144   def testNoNotifyReuse(self):
145     self.cond.acquire()
146     self.cond.notifyAll()
147     self.assertRaises(RuntimeError, self.cond.wait)
148     self.assertRaises(RuntimeError, self.cond.notifyAll)
149     self.cond.release()
150
151
152 class TestPipeCondition(_ConditionTestCase):
153   """PipeCondition tests"""
154
155   def setUp(self):
156     _ConditionTestCase.setUp(self, locking.PipeCondition)
157
158   def testAcquireRelease(self):
159     self._testAcquireRelease()
160
161   def testNotification(self):
162     self._testNotification()
163
164   def _TestWait(self, fn):
165     self._addThread(target=fn)
166     self._addThread(target=fn)
167     self._addThread(target=fn)
168
169     # Wait for threads to be waiting
170     self.assertEqual(self.done.get(True, 1), "A")
171     self.assertEqual(self.done.get(True, 1), "A")
172     self.assertEqual(self.done.get(True, 1), "A")
173
174     self.assertRaises(Queue.Empty, self.done.get_nowait)
175
176     self.cond.acquire()
177     self.assertEqual(self.cond._nwaiters, 3)
178     # This new thread can"t acquire the lock, and thus call wait, before we
179     # release it
180     self._addThread(target=fn)
181     self.cond.notifyAll()
182     self.assertRaises(Queue.Empty, self.done.get_nowait)
183     self.cond.release()
184
185     # We should now get 3 W and 1 A (for the new thread) in whatever order
186     w = 0
187     a = 0
188     for i in range(4):
189       got = self.done.get(True, 1)
190       if got == "W":
191         w += 1
192       elif got == "A":
193         a += 1
194       else:
195         self.fail("Got %s on the done queue" % got)
196
197     self.assertEqual(w, 3)
198     self.assertEqual(a, 1)
199
200     self.cond.acquire()
201     self.cond.notifyAll()
202     self.cond.release()
203     self._waitThreads()
204     self.assertEqual(self.done.get_nowait(), "W")
205     self.assertRaises(Queue.Empty, self.done.get_nowait)
206
207   def testBlockingWait(self):
208     def _BlockingWait():
209       self.cond.acquire()
210       self.done.put("A")
211       self.cond.wait()
212       self.cond.release()
213       self.done.put("W")
214
215     self._TestWait(_BlockingWait)
216
217   def testLongTimeoutWait(self):
218     def _Helper():
219       self.cond.acquire()
220       self.done.put("A")
221       self.cond.wait(15.0)
222       self.cond.release()
223       self.done.put("W")
224
225     self._TestWait(_Helper)
226
227   def _TimeoutWait(self, timeout, check):
228     self.cond.acquire()
229     self.cond.wait(timeout)
230     self.cond.release()
231     self.done.put(check)
232
233   def testShortTimeoutWait(self):
234     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
235     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
236     self._waitThreads()
237     self.assertEqual(self.done.get_nowait(), "T1")
238     self.assertEqual(self.done.get_nowait(), "T1")
239     self.assertRaises(Queue.Empty, self.done.get_nowait)
240
241   def testZeroTimeoutWait(self):
242     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
243     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
244     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
245     self._waitThreads()
246     self.assertEqual(self.done.get_nowait(), "T0")
247     self.assertEqual(self.done.get_nowait(), "T0")
248     self.assertEqual(self.done.get_nowait(), "T0")
249     self.assertRaises(Queue.Empty, self.done.get_nowait)
250
251
252 class TestSharedLock(_ThreadedTestCase):
253   """SharedLock tests"""
254
255   def setUp(self):
256     _ThreadedTestCase.setUp(self)
257     self.sl = locking.SharedLock()
258
259   def testSequenceAndOwnership(self):
260     self.assert_(not self.sl._is_owned())
261     self.sl.acquire(shared=1)
262     self.assert_(self.sl._is_owned())
263     self.assert_(self.sl._is_owned(shared=1))
264     self.assert_(not self.sl._is_owned(shared=0))
265     self.sl.release()
266     self.assert_(not self.sl._is_owned())
267     self.sl.acquire()
268     self.assert_(self.sl._is_owned())
269     self.assert_(not self.sl._is_owned(shared=1))
270     self.assert_(self.sl._is_owned(shared=0))
271     self.sl.release()
272     self.assert_(not self.sl._is_owned())
273     self.sl.acquire(shared=1)
274     self.assert_(self.sl._is_owned())
275     self.assert_(self.sl._is_owned(shared=1))
276     self.assert_(not self.sl._is_owned(shared=0))
277     self.sl.release()
278     self.assert_(not self.sl._is_owned())
279
280   def testBooleanValue(self):
281     # semaphores are supposed to return a true value on a successful acquire
282     self.assert_(self.sl.acquire(shared=1))
283     self.sl.release()
284     self.assert_(self.sl.acquire())
285     self.sl.release()
286
287   def testDoubleLockingStoE(self):
288     self.sl.acquire(shared=1)
289     self.assertRaises(AssertionError, self.sl.acquire)
290
291   def testDoubleLockingEtoS(self):
292     self.sl.acquire()
293     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
294
295   def testDoubleLockingStoS(self):
296     self.sl.acquire(shared=1)
297     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
298
299   def testDoubleLockingEtoE(self):
300     self.sl.acquire()
301     self.assertRaises(AssertionError, self.sl.acquire)
302
303   # helper functions: called in a separate thread they acquire the lock, send
304   # their identifier on the done queue, then release it.
305   def _doItSharer(self):
306     try:
307       self.sl.acquire(shared=1)
308       self.done.put('SHR')
309       self.sl.release()
310     except errors.LockError:
311       self.done.put('ERR')
312
313   def _doItExclusive(self):
314     try:
315       self.sl.acquire()
316       self.done.put('EXC')
317       self.sl.release()
318     except errors.LockError:
319       self.done.put('ERR')
320
321   def _doItDelete(self):
322     try:
323       self.sl.delete()
324       self.done.put('DEL')
325     except errors.LockError:
326       self.done.put('ERR')
327
328   def testSharersCanCoexist(self):
329     self.sl.acquire(shared=1)
330     threading.Thread(target=self._doItSharer).start()
331     self.assert_(self.done.get(True, 1))
332     self.sl.release()
333
334   @_Repeat
335   def testExclusiveBlocksExclusive(self):
336     self.sl.acquire()
337     self._addThread(target=self._doItExclusive)
338     self.assertRaises(Queue.Empty, self.done.get_nowait)
339     self.sl.release()
340     self._waitThreads()
341     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
342
343   @_Repeat
344   def testExclusiveBlocksDelete(self):
345     self.sl.acquire()
346     self._addThread(target=self._doItDelete)
347     self.assertRaises(Queue.Empty, self.done.get_nowait)
348     self.sl.release()
349     self._waitThreads()
350     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
351     self.sl = locking.SharedLock()
352
353   @_Repeat
354   def testExclusiveBlocksSharer(self):
355     self.sl.acquire()
356     self._addThread(target=self._doItSharer)
357     self.assertRaises(Queue.Empty, self.done.get_nowait)
358     self.sl.release()
359     self._waitThreads()
360     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
361
362   @_Repeat
363   def testSharerBlocksExclusive(self):
364     self.sl.acquire(shared=1)
365     self._addThread(target=self._doItExclusive)
366     self.assertRaises(Queue.Empty, self.done.get_nowait)
367     self.sl.release()
368     self._waitThreads()
369     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
370
371   @_Repeat
372   def testSharerBlocksDelete(self):
373     self.sl.acquire(shared=1)
374     self._addThread(target=self._doItDelete)
375     self.assertRaises(Queue.Empty, self.done.get_nowait)
376     self.sl.release()
377     self._waitThreads()
378     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
379     self.sl = locking.SharedLock()
380
381   @_Repeat
382   def testWaitingExclusiveBlocksSharer(self):
383     """SKIPPED testWaitingExclusiveBlockSharer"""
384     return
385
386     self.sl.acquire(shared=1)
387     # the lock is acquired in shared mode...
388     self._addThread(target=self._doItExclusive)
389     # ...but now an exclusive is waiting...
390     self._addThread(target=self._doItSharer)
391     # ...so the sharer should be blocked as well
392     self.assertRaises(Queue.Empty, self.done.get_nowait)
393     self.sl.release()
394     self._waitThreads()
395     # The exclusive passed before
396     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
397     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
398
399   @_Repeat
400   def testWaitingSharerBlocksExclusive(self):
401     """SKIPPED testWaitingSharerBlocksExclusive"""
402     return
403
404     self.sl.acquire()
405     # the lock is acquired in exclusive mode...
406     self._addThread(target=self._doItSharer)
407     # ...but now a sharer is waiting...
408     self._addThread(target=self._doItExclusive)
409     # ...the exclusive is waiting too...
410     self.assertRaises(Queue.Empty, self.done.get_nowait)
411     self.sl.release()
412     self._waitThreads()
413     # The sharer passed before
414     self.assertEqual(self.done.get_nowait(), 'SHR')
415     self.assertEqual(self.done.get_nowait(), 'EXC')
416
417   def testDelete(self):
418     self.sl.delete()
419     self.assertRaises(errors.LockError, self.sl.acquire)
420     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
421     self.assertRaises(errors.LockError, self.sl.delete)
422
423   def testDeleteTimeout(self):
424     self.sl.delete(timeout=60)
425
426   def testNoDeleteIfSharer(self):
427     self.sl.acquire(shared=1)
428     self.assertRaises(AssertionError, self.sl.delete)
429
430   @_Repeat
431   def testDeletePendingSharersExclusiveDelete(self):
432     self.sl.acquire()
433     self._addThread(target=self._doItSharer)
434     self._addThread(target=self._doItSharer)
435     self._addThread(target=self._doItExclusive)
436     self._addThread(target=self._doItDelete)
437     self.sl.delete()
438     self._waitThreads()
439     # The threads who were pending return ERR
440     for _ in range(4):
441       self.assertEqual(self.done.get_nowait(), 'ERR')
442     self.sl = locking.SharedLock()
443
444   @_Repeat
445   def testDeletePendingDeleteExclusiveSharers(self):
446     self.sl.acquire()
447     self._addThread(target=self._doItDelete)
448     self._addThread(target=self._doItExclusive)
449     self._addThread(target=self._doItSharer)
450     self._addThread(target=self._doItSharer)
451     self.sl.delete()
452     self._waitThreads()
453     # The two threads who were pending return both ERR
454     self.assertEqual(self.done.get_nowait(), 'ERR')
455     self.assertEqual(self.done.get_nowait(), 'ERR')
456     self.assertEqual(self.done.get_nowait(), 'ERR')
457     self.assertEqual(self.done.get_nowait(), 'ERR')
458     self.sl = locking.SharedLock()
459
460   @_Repeat
461   def testExclusiveAcquireTimeout(self):
462     for shared in [0, 1]:
463       on_queue = threading.Event()
464       release_exclusive = threading.Event()
465
466       def _LockExclusive():
467         self.sl.acquire(shared=0, test_notify=on_queue.set)
468         self.done.put("A: start wait")
469         release_exclusive.wait()
470         self.done.put("A: end wait")
471         self.sl.release()
472
473       # Start thread to hold lock in exclusive mode
474       self._addThread(target=_LockExclusive)
475
476       # Wait for wait to begin
477       self.assertEqual(self.done.get(timeout=60), "A: start wait")
478
479       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
480       # on the queue
481       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
482                                       test_notify=release_exclusive.set))
483
484       self.done.put("got 2nd")
485       self.sl.release()
486
487       self._waitThreads()
488
489       self.assertEqual(self.done.get_nowait(), "A: end wait")
490       self.assertEqual(self.done.get_nowait(), "got 2nd")
491       self.assertRaises(Queue.Empty, self.done.get_nowait)
492
493   @_Repeat
494   def testAcquireExpiringTimeout(self):
495     def _AcquireWithTimeout(shared, timeout):
496       if not self.sl.acquire(shared=shared, timeout=timeout):
497         self.done.put("timeout")
498
499     for shared in [0, 1]:
500       # Lock exclusively
501       self.sl.acquire()
502
503       # Start shared acquires with timeout between 0 and 20 ms
504       for i in range(11):
505         self._addThread(target=_AcquireWithTimeout,
506                         args=(shared, i * 2.0 / 1000.0))
507
508       # Wait for threads to finish (makes sure the acquire timeout expires
509       # before releasing the lock)
510       self._waitThreads()
511
512       # Release lock
513       self.sl.release()
514
515       for _ in range(11):
516         self.assertEqual(self.done.get_nowait(), "timeout")
517
518       self.assertRaises(Queue.Empty, self.done.get_nowait)
519
520   @_Repeat
521   def testSharedSkipExclusiveAcquires(self):
522     # Tests whether shared acquires jump in front of exclusive acquires in the
523     # queue.
524
525     def _Acquire(shared, name, notify_ev, wait_ev):
526       if notify_ev:
527         notify_fn = notify_ev.set
528       else:
529         notify_fn = None
530
531       if wait_ev:
532         wait_ev.wait()
533
534       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
535         return
536
537       self.done.put(name)
538       self.sl.release()
539
540     # Get exclusive lock while we fill the queue
541     self.sl.acquire()
542
543     shrcnt1 = 5
544     shrcnt2 = 7
545     shrcnt3 = 9
546     shrcnt4 = 2
547
548     # Add acquires using threading.Event for synchronization. They'll be
549     # acquired exactly in the order defined in this list.
550     acquires = (shrcnt1 * [(1, "shared 1")] +
551                 3 * [(0, "exclusive 1")] +
552                 shrcnt2 * [(1, "shared 2")] +
553                 shrcnt3 * [(1, "shared 3")] +
554                 shrcnt4 * [(1, "shared 4")] +
555                 3 * [(0, "exclusive 2")])
556
557     ev_cur = None
558     ev_prev = None
559
560     for args in acquires:
561       ev_cur = threading.Event()
562       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
563       ev_prev = ev_cur
564
565     # Wait for last acquire to start
566     ev_prev.wait()
567
568     # Expect 6 pending exclusive acquires and 1 for all shared acquires
569     # together
570     self.assertEqual(self.sl._count_pending(), 7)
571
572     # Release exclusive lock and wait
573     self.sl.release()
574
575     self._waitThreads()
576
577     # Check sequence
578     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
579       # Shared locks aren't guaranteed to be notified in order, but they'll be
580       # first
581       tmp = self.done.get_nowait()
582       if tmp == "shared 1":
583         shrcnt1 -= 1
584       elif tmp == "shared 2":
585         shrcnt2 -= 1
586       elif tmp == "shared 3":
587         shrcnt3 -= 1
588       elif tmp == "shared 4":
589         shrcnt4 -= 1
590     self.assertEqual(shrcnt1, 0)
591     self.assertEqual(shrcnt2, 0)
592     self.assertEqual(shrcnt3, 0)
593     self.assertEqual(shrcnt3, 0)
594
595     for _ in range(3):
596       self.assertEqual(self.done.get_nowait(), "exclusive 1")
597
598     for _ in range(3):
599       self.assertEqual(self.done.get_nowait(), "exclusive 2")
600
601     self.assertRaises(Queue.Empty, self.done.get_nowait)
602
603   @_Repeat
604   def testMixedAcquireTimeout(self):
605     sync = threading.Condition()
606
607     def _AcquireShared(ev):
608       if not self.sl.acquire(shared=1, timeout=None):
609         return
610
611       self.done.put("shared")
612
613       # Notify main thread
614       ev.set()
615
616       # Wait for notification
617       sync.acquire()
618       try:
619         sync.wait()
620       finally:
621         sync.release()
622
623       # Release lock
624       self.sl.release()
625
626     acquires = []
627     for _ in range(3):
628       ev = threading.Event()
629       self._addThread(target=_AcquireShared, args=(ev, ))
630       acquires.append(ev)
631
632     # Wait for all acquires to finish
633     for i in acquires:
634       i.wait()
635
636     self.assertEqual(self.sl._count_pending(), 0)
637
638     # Try to get exclusive lock
639     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
640
641     # Acquire exclusive without timeout
642     exclsync = threading.Condition()
643     exclev = threading.Event()
644
645     def _AcquireExclusive():
646       if not self.sl.acquire(shared=0):
647         return
648
649       self.done.put("exclusive")
650
651       # Notify main thread
652       exclev.set()
653
654       exclsync.acquire()
655       try:
656         exclsync.wait()
657       finally:
658         exclsync.release()
659
660       self.sl.release()
661
662     self._addThread(target=_AcquireExclusive)
663
664     # Try to get exclusive lock
665     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
666
667     # Make all shared holders release their locks
668     sync.acquire()
669     try:
670       sync.notifyAll()
671     finally:
672       sync.release()
673
674     # Wait for exclusive acquire to succeed
675     exclev.wait()
676
677     self.assertEqual(self.sl._count_pending(), 0)
678
679     # Try to get exclusive lock
680     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
681
682     def _AcquireSharedSimple():
683       if self.sl.acquire(shared=1, timeout=None):
684         self.done.put("shared2")
685         self.sl.release()
686
687     for _ in range(10):
688       self._addThread(target=_AcquireSharedSimple)
689
690     # Tell exclusive lock to release
691     exclsync.acquire()
692     try:
693       exclsync.notifyAll()
694     finally:
695       exclsync.release()
696
697     # Wait for everything to finish
698     self._waitThreads()
699
700     self.assertEqual(self.sl._count_pending(), 0)
701
702     # Check sequence
703     for _ in range(3):
704       self.assertEqual(self.done.get_nowait(), "shared")
705
706     self.assertEqual(self.done.get_nowait(), "exclusive")
707
708     for _ in range(10):
709       self.assertEqual(self.done.get_nowait(), "shared2")
710
711     self.assertRaises(Queue.Empty, self.done.get_nowait)
712
713
714 class TestSSynchronizedDecorator(_ThreadedTestCase):
715   """Shared Lock Synchronized decorator test"""
716
717   def setUp(self):
718     _ThreadedTestCase.setUp(self)
719
720   @locking.ssynchronized(_decoratorlock)
721   def _doItExclusive(self):
722     self.assert_(_decoratorlock._is_owned())
723     self.done.put('EXC')
724
725   @locking.ssynchronized(_decoratorlock, shared=1)
726   def _doItSharer(self):
727     self.assert_(_decoratorlock._is_owned(shared=1))
728     self.done.put('SHR')
729
730   def testDecoratedFunctions(self):
731     self._doItExclusive()
732     self.assert_(not _decoratorlock._is_owned())
733     self._doItSharer()
734     self.assert_(not _decoratorlock._is_owned())
735
736   def testSharersCanCoexist(self):
737     _decoratorlock.acquire(shared=1)
738     threading.Thread(target=self._doItSharer).start()
739     self.assert_(self.done.get(True, 1))
740     _decoratorlock.release()
741
742   @_Repeat
743   def testExclusiveBlocksExclusive(self):
744     _decoratorlock.acquire()
745     self._addThread(target=self._doItExclusive)
746     # give it a bit of time to check that it's not actually doing anything
747     self.assertRaises(Queue.Empty, self.done.get_nowait)
748     _decoratorlock.release()
749     self._waitThreads()
750     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
751
752   @_Repeat
753   def testExclusiveBlocksSharer(self):
754     _decoratorlock.acquire()
755     self._addThread(target=self._doItSharer)
756     self.assertRaises(Queue.Empty, self.done.get_nowait)
757     _decoratorlock.release()
758     self._waitThreads()
759     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
760
761   @_Repeat
762   def testSharerBlocksExclusive(self):
763     _decoratorlock.acquire(shared=1)
764     self._addThread(target=self._doItExclusive)
765     self.assertRaises(Queue.Empty, self.done.get_nowait)
766     _decoratorlock.release()
767     self._waitThreads()
768     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
769
770
771 class TestLockSet(_ThreadedTestCase):
772   """LockSet tests"""
773
774   def setUp(self):
775     _ThreadedTestCase.setUp(self)
776     self._setUpLS()
777
778   def _setUpLS(self):
779     """Helper to (re)initialize the lock set"""
780     self.resources = ['one', 'two', 'three']
781     self.ls = locking.LockSet(members=self.resources)
782
783   def testResources(self):
784     self.assertEquals(self.ls._names(), set(self.resources))
785     newls = locking.LockSet()
786     self.assertEquals(newls._names(), set())
787
788   def testAcquireRelease(self):
789     self.assert_(self.ls.acquire('one'))
790     self.assertEquals(self.ls._list_owned(), set(['one']))
791     self.ls.release()
792     self.assertEquals(self.ls._list_owned(), set())
793     self.assertEquals(self.ls.acquire(['one']), set(['one']))
794     self.assertEquals(self.ls._list_owned(), set(['one']))
795     self.ls.release()
796     self.assertEquals(self.ls._list_owned(), set())
797     self.ls.acquire(['one', 'two', 'three'])
798     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
799     self.ls.release('one')
800     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
801     self.ls.release(['three'])
802     self.assertEquals(self.ls._list_owned(), set(['two']))
803     self.ls.release()
804     self.assertEquals(self.ls._list_owned(), set())
805     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
806     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
807     self.ls.release()
808     self.assertEquals(self.ls._list_owned(), set())
809
810   def testNoDoubleAcquire(self):
811     self.ls.acquire('one')
812     self.assertRaises(AssertionError, self.ls.acquire, 'one')
813     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
814     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
815     self.ls.release()
816     self.ls.acquire(['one', 'three'])
817     self.ls.release('one')
818     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
819     self.ls.release('three')
820
821   def testNoWrongRelease(self):
822     self.assertRaises(AssertionError, self.ls.release)
823     self.ls.acquire('one')
824     self.assertRaises(AssertionError, self.ls.release, 'two')
825
826   def testAddRemove(self):
827     self.ls.add('four')
828     self.assertEquals(self.ls._list_owned(), set())
829     self.assert_('four' in self.ls._names())
830     self.ls.add(['five', 'six', 'seven'], acquired=1)
831     self.assert_('five' in self.ls._names())
832     self.assert_('six' in self.ls._names())
833     self.assert_('seven' in self.ls._names())
834     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
835     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
836     self.assert_('five' not in self.ls._names())
837     self.assert_('six' not in self.ls._names())
838     self.assertEquals(self.ls._list_owned(), set(['seven']))
839     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
840     self.ls.remove('seven')
841     self.assert_('seven' not in self.ls._names())
842     self.assertEquals(self.ls._list_owned(), set([]))
843     self.ls.acquire(None, shared=1)
844     self.assertRaises(AssertionError, self.ls.add, 'eight')
845     self.ls.release()
846     self.ls.acquire(None)
847     self.ls.add('eight', acquired=1)
848     self.assert_('eight' in self.ls._names())
849     self.assert_('eight' in self.ls._list_owned())
850     self.ls.add('nine')
851     self.assert_('nine' in self.ls._names())
852     self.assert_('nine' not in self.ls._list_owned())
853     self.ls.release()
854     self.ls.remove(['two'])
855     self.assert_('two' not in self.ls._names())
856     self.ls.acquire('three')
857     self.assertEquals(self.ls.remove(['three']), ['three'])
858     self.assert_('three' not in self.ls._names())
859     self.assertEquals(self.ls.remove('three'), [])
860     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
861     self.assert_('one' not in self.ls._names())
862
863   def testRemoveNonBlocking(self):
864     self.ls.acquire('one')
865     self.assertEquals(self.ls.remove('one'), ['one'])
866     self.ls.acquire(['two', 'three'])
867     self.assertEquals(self.ls.remove(['two', 'three']),
868                       ['two', 'three'])
869
870   def testNoDoubleAdd(self):
871     self.assertRaises(errors.LockError, self.ls.add, 'two')
872     self.ls.add('four')
873     self.assertRaises(errors.LockError, self.ls.add, 'four')
874
875   def testNoWrongRemoves(self):
876     self.ls.acquire(['one', 'three'], shared=1)
877     # Cannot remove 'two' while holding something which is not a superset
878     self.assertRaises(AssertionError, self.ls.remove, 'two')
879     # Cannot remove 'three' as we are sharing it
880     self.assertRaises(AssertionError, self.ls.remove, 'three')
881
882   def testAcquireSetLock(self):
883     # acquire the set-lock exclusively
884     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
885     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
886     self.assertEquals(self.ls._is_owned(), True)
887     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
888     # I can still add/remove elements...
889     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
890     self.assert_(self.ls.add('six'))
891     self.ls.release()
892     # share the set-lock
893     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
894     # adding new elements is not possible
895     self.assertRaises(AssertionError, self.ls.add, 'five')
896     self.ls.release()
897
898   def testAcquireWithRepetitions(self):
899     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
900                       set(['two', 'two', 'three']))
901     self.ls.release(['two', 'two'])
902     self.assertEquals(self.ls._list_owned(), set(['three']))
903
904   def testEmptyAcquire(self):
905     # Acquire an empty list of locks...
906     self.assertEquals(self.ls.acquire([]), set())
907     self.assertEquals(self.ls._list_owned(), set())
908     # New locks can still be addded
909     self.assert_(self.ls.add('six'))
910     # "re-acquiring" is not an issue, since we had really acquired nothing
911     self.assertEquals(self.ls.acquire([], shared=1), set())
912     self.assertEquals(self.ls._list_owned(), set())
913     # We haven't really acquired anything, so we cannot release
914     self.assertRaises(AssertionError, self.ls.release)
915
916   def _doLockSet(self, names, shared):
917     try:
918       self.ls.acquire(names, shared=shared)
919       self.done.put('DONE')
920       self.ls.release()
921     except errors.LockError:
922       self.done.put('ERR')
923
924   def _doAddSet(self, names):
925     try:
926       self.ls.add(names, acquired=1)
927       self.done.put('DONE')
928       self.ls.release()
929     except errors.LockError:
930       self.done.put('ERR')
931
932   def _doRemoveSet(self, names):
933     self.done.put(self.ls.remove(names))
934
935   @_Repeat
936   def testConcurrentSharedAcquire(self):
937     self.ls.acquire(['one', 'two'], shared=1)
938     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
939     self._waitThreads()
940     self.assertEqual(self.done.get_nowait(), 'DONE')
941     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
942     self._waitThreads()
943     self.assertEqual(self.done.get_nowait(), 'DONE')
944     self._addThread(target=self._doLockSet, args=('three', 1))
945     self._waitThreads()
946     self.assertEqual(self.done.get_nowait(), 'DONE')
947     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
948     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
949     self.assertRaises(Queue.Empty, self.done.get_nowait)
950     self.ls.release()
951     self._waitThreads()
952     self.assertEqual(self.done.get_nowait(), 'DONE')
953     self.assertEqual(self.done.get_nowait(), 'DONE')
954
955   @_Repeat
956   def testConcurrentExclusiveAcquire(self):
957     self.ls.acquire(['one', 'two'])
958     self._addThread(target=self._doLockSet, args=('three', 1))
959     self._waitThreads()
960     self.assertEqual(self.done.get_nowait(), 'DONE')
961     self._addThread(target=self._doLockSet, args=('three', 0))
962     self._waitThreads()
963     self.assertEqual(self.done.get_nowait(), 'DONE')
964     self.assertRaises(Queue.Empty, self.done.get_nowait)
965     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
966     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
967     self._addThread(target=self._doLockSet, args=('one', 0))
968     self._addThread(target=self._doLockSet, args=('one', 1))
969     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
970     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
971     self.assertRaises(Queue.Empty, self.done.get_nowait)
972     self.ls.release()
973     self._waitThreads()
974     for _ in range(6):
975       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
976
977   @_Repeat
978   def testSimpleAcquireTimeoutExpiring(self):
979     names = sorted(self.ls._names())
980     self.assert_(len(names) >= 3)
981
982     # Get name of first lock
983     first = names[0]
984
985     # Get name of last lock
986     last = names.pop()
987
988     checks = [
989       # Block first and try to lock it again
990       (first, first),
991
992       # Block last and try to lock all locks
993       (None, first),
994
995       # Block last and try to lock it again
996       (last, last),
997       ]
998
999     for (wanted, block) in checks:
1000       # Lock in exclusive mode
1001       self.assert_(self.ls.acquire(block, shared=0))
1002
1003       def _AcquireOne():
1004         # Try to get the same lock again with a timeout (should never succeed)
1005         if self.ls.acquire(wanted, timeout=0.1, shared=0):
1006           self.done.put("acquired")
1007           self.ls.release()
1008         else:
1009           self.assert_(not self.ls._list_owned())
1010           self.assert_(not self.ls._is_owned())
1011           self.done.put("not acquired")
1012
1013       self._addThread(target=_AcquireOne)
1014
1015       # Wait for timeout in thread to expire
1016       self._waitThreads()
1017
1018       # Release exclusive lock again
1019       self.ls.release()
1020
1021       self.assertEqual(self.done.get_nowait(), "not acquired")
1022       self.assertRaises(Queue.Empty, self.done.get_nowait)
1023
1024   @_Repeat
1025   def testDelayedAndExpiringLockAcquire(self):
1026     self._setUpLS()
1027     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1028
1029     for expire in (False, True):
1030       names = sorted(self.ls._names())
1031       self.assertEqual(len(names), 8)
1032
1033       lock_ev = dict([(i, threading.Event()) for i in names])
1034
1035       # Lock all in exclusive mode
1036       self.assert_(self.ls.acquire(names, shared=0))
1037
1038       if expire:
1039         # We'll wait at least 300ms per lock
1040         lockwait = len(names) * [0.3]
1041
1042         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1043         # this gives us up to 2.4s to fail.
1044         lockall_timeout = 0.4
1045       else:
1046         # This should finish rather quickly
1047         lockwait = None
1048         lockall_timeout = len(names) * 5.0
1049
1050       def _LockAll():
1051         def acquire_notification(name):
1052           if not expire:
1053             self.done.put("getting %s" % name)
1054
1055           # Kick next lock
1056           lock_ev[name].set()
1057
1058         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1059                            test_notify=acquire_notification):
1060           self.done.put("got all")
1061           self.ls.release()
1062         else:
1063           self.done.put("timeout on all")
1064
1065         # Notify all locks
1066         for ev in lock_ev.values():
1067           ev.set()
1068
1069       t = self._addThread(target=_LockAll)
1070
1071       for idx, name in enumerate(names):
1072         # Wait for actual acquire on this lock to start
1073         lock_ev[name].wait(10.0)
1074
1075         if expire and t.isAlive():
1076           # Wait some time after getting the notification to make sure the lock
1077           # acquire will expire
1078           SafeSleep(lockwait[idx])
1079
1080         self.ls.release(names=name)
1081
1082       self.assert_(not self.ls._list_owned())
1083
1084       self._waitThreads()
1085
1086       if expire:
1087         # Not checking which locks were actually acquired. Doing so would be
1088         # too timing-dependant.
1089         self.assertEqual(self.done.get_nowait(), "timeout on all")
1090       else:
1091         for i in names:
1092           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1093         self.assertEqual(self.done.get_nowait(), "got all")
1094       self.assertRaises(Queue.Empty, self.done.get_nowait)
1095
1096   @_Repeat
1097   def testConcurrentRemove(self):
1098     self.ls.add('four')
1099     self.ls.acquire(['one', 'two', 'four'])
1100     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1101     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1102     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1103     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1104     self.assertRaises(Queue.Empty, self.done.get_nowait)
1105     self.ls.remove('one')
1106     self.ls.release()
1107     self._waitThreads()
1108     for i in range(4):
1109       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1110     self.ls.add(['five', 'six'], acquired=1)
1111     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1112     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1113     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1114     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1115     self.ls.remove('five')
1116     self.ls.release()
1117     self._waitThreads()
1118     for i in range(4):
1119       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1120     self.ls.acquire(['three', 'four'])
1121     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1122     self.assertRaises(Queue.Empty, self.done.get_nowait)
1123     self.ls.remove('four')
1124     self._waitThreads()
1125     self.assertEqual(self.done.get_nowait(), ['six'])
1126     self._addThread(target=self._doRemoveSet, args=(['two']))
1127     self._waitThreads()
1128     self.assertEqual(self.done.get_nowait(), ['two'])
1129     self.ls.release()
1130     # reset lockset
1131     self._setUpLS()
1132
1133   @_Repeat
1134   def testConcurrentSharedSetLock(self):
1135     # share the set-lock...
1136     self.ls.acquire(None, shared=1)
1137     # ...another thread can share it too
1138     self._addThread(target=self._doLockSet, args=(None, 1))
1139     self._waitThreads()
1140     self.assertEqual(self.done.get_nowait(), 'DONE')
1141     # ...or just share some elements
1142     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1143     self._waitThreads()
1144     self.assertEqual(self.done.get_nowait(), 'DONE')
1145     # ...but not add new ones or remove any
1146     t = self._addThread(target=self._doAddSet, args=(['nine']))
1147     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1148     self.assertRaises(Queue.Empty, self.done.get_nowait)
1149     # this just releases the set-lock
1150     self.ls.release([])
1151     t.join(60)
1152     self.assertEqual(self.done.get_nowait(), 'DONE')
1153     # release the lock on the actual elements so remove() can proceed too
1154     self.ls.release()
1155     self._waitThreads()
1156     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1157     # reset lockset
1158     self._setUpLS()
1159
1160   @_Repeat
1161   def testConcurrentExclusiveSetLock(self):
1162     # acquire the set-lock...
1163     self.ls.acquire(None, shared=0)
1164     # ...no one can do anything else
1165     self._addThread(target=self._doLockSet, args=(None, 1))
1166     self._addThread(target=self._doLockSet, args=(None, 0))
1167     self._addThread(target=self._doLockSet, args=(['three'], 0))
1168     self._addThread(target=self._doLockSet, args=(['two'], 1))
1169     self._addThread(target=self._doAddSet, args=(['nine']))
1170     self.assertRaises(Queue.Empty, self.done.get_nowait)
1171     self.ls.release()
1172     self._waitThreads()
1173     for _ in range(5):
1174       self.assertEqual(self.done.get(True, 1), 'DONE')
1175     # cleanup
1176     self._setUpLS()
1177
1178   @_Repeat
1179   def testConcurrentSetLockAdd(self):
1180     self.ls.acquire('one')
1181     # Another thread wants the whole SetLock
1182     self._addThread(target=self._doLockSet, args=(None, 0))
1183     self._addThread(target=self._doLockSet, args=(None, 1))
1184     self.assertRaises(Queue.Empty, self.done.get_nowait)
1185     self.assertRaises(AssertionError, self.ls.add, 'four')
1186     self.ls.release()
1187     self._waitThreads()
1188     self.assertEqual(self.done.get_nowait(), 'DONE')
1189     self.assertEqual(self.done.get_nowait(), 'DONE')
1190     self.ls.acquire(None)
1191     self._addThread(target=self._doLockSet, args=(None, 0))
1192     self._addThread(target=self._doLockSet, args=(None, 1))
1193     self.assertRaises(Queue.Empty, self.done.get_nowait)
1194     self.ls.add('four')
1195     self.ls.add('five', acquired=1)
1196     self.ls.add('six', acquired=1, shared=1)
1197     self.assertEquals(self.ls._list_owned(),
1198       set(['one', 'two', 'three', 'five', 'six']))
1199     self.assertEquals(self.ls._is_owned(), True)
1200     self.assertEquals(self.ls._names(),
1201       set(['one', 'two', 'three', 'four', 'five', 'six']))
1202     self.ls.release()
1203     self._waitThreads()
1204     self.assertEqual(self.done.get_nowait(), 'DONE')
1205     self.assertEqual(self.done.get_nowait(), 'DONE')
1206     self._setUpLS()
1207
1208   @_Repeat
1209   def testEmptyLockSet(self):
1210     # get the set-lock
1211     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1212     # now empty it...
1213     self.ls.remove(['one', 'two', 'three'])
1214     # and adds/locks by another thread still wait
1215     self._addThread(target=self._doAddSet, args=(['nine']))
1216     self._addThread(target=self._doLockSet, args=(None, 1))
1217     self._addThread(target=self._doLockSet, args=(None, 0))
1218     self.assertRaises(Queue.Empty, self.done.get_nowait)
1219     self.ls.release()
1220     self._waitThreads()
1221     for _ in range(3):
1222       self.assertEqual(self.done.get_nowait(), 'DONE')
1223     # empty it again...
1224     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1225     # now share it...
1226     self.assertEqual(self.ls.acquire(None, shared=1), set())
1227     # other sharers can go, adds still wait
1228     self._addThread(target=self._doLockSet, args=(None, 1))
1229     self._waitThreads()
1230     self.assertEqual(self.done.get_nowait(), 'DONE')
1231     self._addThread(target=self._doAddSet, args=(['nine']))
1232     self.assertRaises(Queue.Empty, self.done.get_nowait)
1233     self.ls.release()
1234     self._waitThreads()
1235     self.assertEqual(self.done.get_nowait(), 'DONE')
1236     self._setUpLS()
1237
1238
1239 class TestGanetiLockManager(_ThreadedTestCase):
1240
1241   def setUp(self):
1242     _ThreadedTestCase.setUp(self)
1243     self.nodes=['n1', 'n2']
1244     self.instances=['i1', 'i2', 'i3']
1245     self.GL = locking.GanetiLockManager(nodes=self.nodes,
1246                                         instances=self.instances)
1247
1248   def tearDown(self):
1249     # Don't try this at home...
1250     locking.GanetiLockManager._instance = None
1251
1252   def testLockingConstants(self):
1253     # The locking library internally cheats by assuming its constants have some
1254     # relationships with each other. Check those hold true.
1255     # This relationship is also used in the Processor to recursively acquire
1256     # the right locks. Again, please don't break it.
1257     for i in range(len(locking.LEVELS)):
1258       self.assertEqual(i, locking.LEVELS[i])
1259
1260   def testDoubleGLFails(self):
1261     self.assertRaises(AssertionError, locking.GanetiLockManager)
1262
1263   def testLockNames(self):
1264     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1265     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1266     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1267                      set(self.instances))
1268
1269   def testInitAndResources(self):
1270     locking.GanetiLockManager._instance = None
1271     self.GL = locking.GanetiLockManager()
1272     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1273     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1274     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1275
1276     locking.GanetiLockManager._instance = None
1277     self.GL = locking.GanetiLockManager(nodes=self.nodes)
1278     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1279     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1280     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1281
1282     locking.GanetiLockManager._instance = None
1283     self.GL = locking.GanetiLockManager(instances=self.instances)
1284     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1285     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1286     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1287                      set(self.instances))
1288
1289   def testAcquireRelease(self):
1290     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1291     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1292     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1293     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1294     self.GL.release(locking.LEVEL_NODE, ['n2'])
1295     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1296     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1297     self.GL.release(locking.LEVEL_NODE)
1298     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1299     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1300     self.GL.release(locking.LEVEL_INSTANCE)
1301     self.assertRaises(errors.LockError, self.GL.acquire,
1302                       locking.LEVEL_INSTANCE, ['i5'])
1303     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1304     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1305
1306   def testAcquireWholeSets(self):
1307     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1308     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1309                       set(self.instances))
1310     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1311                       set(self.instances))
1312     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1313                       set(self.nodes))
1314     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1315                       set(self.nodes))
1316     self.GL.release(locking.LEVEL_NODE)
1317     self.GL.release(locking.LEVEL_INSTANCE)
1318     self.GL.release(locking.LEVEL_CLUSTER)
1319
1320   def testAcquireWholeAndPartial(self):
1321     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1322     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1323                       set(self.instances))
1324     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1325                       set(self.instances))
1326     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1327                       set(['n2']))
1328     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1329                       set(['n2']))
1330     self.GL.release(locking.LEVEL_NODE)
1331     self.GL.release(locking.LEVEL_INSTANCE)
1332     self.GL.release(locking.LEVEL_CLUSTER)
1333
1334   def testBGLDependency(self):
1335     self.assertRaises(AssertionError, self.GL.acquire,
1336                       locking.LEVEL_NODE, ['n1', 'n2'])
1337     self.assertRaises(AssertionError, self.GL.acquire,
1338                       locking.LEVEL_INSTANCE, ['i3'])
1339     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1340     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1341     self.assertRaises(AssertionError, self.GL.release,
1342                       locking.LEVEL_CLUSTER, ['BGL'])
1343     self.assertRaises(AssertionError, self.GL.release,
1344                       locking.LEVEL_CLUSTER)
1345     self.GL.release(locking.LEVEL_NODE)
1346     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1347     self.assertRaises(AssertionError, self.GL.release,
1348                       locking.LEVEL_CLUSTER, ['BGL'])
1349     self.assertRaises(AssertionError, self.GL.release,
1350                       locking.LEVEL_CLUSTER)
1351     self.GL.release(locking.LEVEL_INSTANCE)
1352
1353   def testWrongOrder(self):
1354     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1355     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1356     self.assertRaises(AssertionError, self.GL.acquire,
1357                       locking.LEVEL_NODE, ['n1'])
1358     self.assertRaises(AssertionError, self.GL.acquire,
1359                       locking.LEVEL_INSTANCE, ['i2'])
1360
1361   # Helper function to run as a thread that shared the BGL and then acquires
1362   # some locks at another level.
1363   def _doLock(self, level, names, shared):
1364     try:
1365       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1366       self.GL.acquire(level, names, shared=shared)
1367       self.done.put('DONE')
1368       self.GL.release(level)
1369       self.GL.release(locking.LEVEL_CLUSTER)
1370     except errors.LockError:
1371       self.done.put('ERR')
1372
1373   @_Repeat
1374   def testConcurrency(self):
1375     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1376     self._addThread(target=self._doLock,
1377                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1378     self._waitThreads()
1379     self.assertEqual(self.done.get_nowait(), 'DONE')
1380     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1381     self._addThread(target=self._doLock,
1382                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1383     self._waitThreads()
1384     self.assertEqual(self.done.get_nowait(), 'DONE')
1385     self._addThread(target=self._doLock,
1386                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1387     self.assertRaises(Queue.Empty, self.done.get_nowait)
1388     self.GL.release(locking.LEVEL_INSTANCE)
1389     self._waitThreads()
1390     self.assertEqual(self.done.get_nowait(), 'DONE')
1391     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1392     self._addThread(target=self._doLock,
1393                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1394     self._waitThreads()
1395     self.assertEqual(self.done.get_nowait(), 'DONE')
1396     self._addThread(target=self._doLock,
1397                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1398     self.assertRaises(Queue.Empty, self.done.get_nowait)
1399     self.GL.release(locking.LEVEL_INSTANCE)
1400     self._waitThreads()
1401     self.assertEqual(self.done.get(True, 1), 'DONE')
1402     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1403
1404
1405 if __name__ == '__main__':
1406   unittest.main()
1407   #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1408   #unittest.TextTestRunner(verbosity=2).run(suite)