Add simple lock monitor
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31
32 from ganeti import locking
33 from ganeti import errors
34 from ganeti import utils
35
36 import testutils
37
38
39 # This is used to test the ssynchronize decorator.
40 # Since it's passed as input to a decorator it must be declared as a global.
41 _decoratorlock = locking.SharedLock("decorator lock")
42
43 #: List for looping tests
44 ITERATIONS = range(8)
45
46
47 def _Repeat(fn):
48   """Decorator for executing a function many times"""
49   def wrapper(*args, **kwargs):
50     for i in ITERATIONS:
51       fn(*args, **kwargs)
52   return wrapper
53
54
55 def SafeSleep(duration):
56   start = time.time()
57   while True:
58     delay = start + duration - time.time()
59     if delay <= 0.0:
60       break
61     time.sleep(delay)
62
63
64 class _ThreadedTestCase(unittest.TestCase):
65   """Test class that supports adding/waiting on threads"""
66   def setUp(self):
67     unittest.TestCase.setUp(self)
68     self.done = Queue.Queue(0)
69     self.threads = []
70
71   def _addThread(self, *args, **kwargs):
72     """Create and remember a new thread"""
73     t = threading.Thread(*args, **kwargs)
74     self.threads.append(t)
75     t.start()
76     return t
77
78   def _waitThreads(self):
79     """Wait for all our threads to finish"""
80     for t in self.threads:
81       t.join(60)
82       self.failIf(t.isAlive())
83     self.threads = []
84
85
86 class _ConditionTestCase(_ThreadedTestCase):
87   """Common test case for conditions"""
88
89   def setUp(self, cls):
90     _ThreadedTestCase.setUp(self)
91     self.lock = threading.Lock()
92     self.cond = cls(self.lock)
93
94   def _testAcquireRelease(self):
95     self.assertFalse(self.cond._is_owned())
96     self.assertRaises(RuntimeError, self.cond.wait)
97     self.assertRaises(RuntimeError, self.cond.notifyAll)
98
99     self.cond.acquire()
100     self.assert_(self.cond._is_owned())
101     self.cond.notifyAll()
102     self.assert_(self.cond._is_owned())
103     self.cond.release()
104
105     self.assertFalse(self.cond._is_owned())
106     self.assertRaises(RuntimeError, self.cond.wait)
107     self.assertRaises(RuntimeError, self.cond.notifyAll)
108
109   def _testNotification(self):
110     def _NotifyAll():
111       self.done.put("NE")
112       self.cond.acquire()
113       self.done.put("NA")
114       self.cond.notifyAll()
115       self.done.put("NN")
116       self.cond.release()
117
118     self.cond.acquire()
119     self._addThread(target=_NotifyAll)
120     self.assertEqual(self.done.get(True, 1), "NE")
121     self.assertRaises(Queue.Empty, self.done.get_nowait)
122     self.cond.wait()
123     self.assertEqual(self.done.get(True, 1), "NA")
124     self.assertEqual(self.done.get(True, 1), "NN")
125     self.assert_(self.cond._is_owned())
126     self.cond.release()
127     self.assertFalse(self.cond._is_owned())
128
129
130 class TestSingleNotifyPipeCondition(_ConditionTestCase):
131   """SingleNotifyPipeCondition tests"""
132
133   def setUp(self):
134     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
135
136   def testAcquireRelease(self):
137     self._testAcquireRelease()
138
139   def testNotification(self):
140     self._testNotification()
141
142   def testWaitReuse(self):
143     self.cond.acquire()
144     self.cond.wait(0)
145     self.cond.wait(0.1)
146     self.cond.release()
147
148   def testNoNotifyReuse(self):
149     self.cond.acquire()
150     self.cond.notifyAll()
151     self.assertRaises(RuntimeError, self.cond.wait)
152     self.assertRaises(RuntimeError, self.cond.notifyAll)
153     self.cond.release()
154
155
156 class TestPipeCondition(_ConditionTestCase):
157   """PipeCondition tests"""
158
159   def setUp(self):
160     _ConditionTestCase.setUp(self, locking.PipeCondition)
161
162   def testAcquireRelease(self):
163     self._testAcquireRelease()
164
165   def testNotification(self):
166     self._testNotification()
167
168   def _TestWait(self, fn):
169     self._addThread(target=fn)
170     self._addThread(target=fn)
171     self._addThread(target=fn)
172
173     # Wait for threads to be waiting
174     self.assertEqual(self.done.get(True, 1), "A")
175     self.assertEqual(self.done.get(True, 1), "A")
176     self.assertEqual(self.done.get(True, 1), "A")
177
178     self.assertRaises(Queue.Empty, self.done.get_nowait)
179
180     self.cond.acquire()
181     self.assertEqual(self.cond._nwaiters, 3)
182     # This new thread can"t acquire the lock, and thus call wait, before we
183     # release it
184     self._addThread(target=fn)
185     self.cond.notifyAll()
186     self.assertRaises(Queue.Empty, self.done.get_nowait)
187     self.cond.release()
188
189     # We should now get 3 W and 1 A (for the new thread) in whatever order
190     w = 0
191     a = 0
192     for i in range(4):
193       got = self.done.get(True, 1)
194       if got == "W":
195         w += 1
196       elif got == "A":
197         a += 1
198       else:
199         self.fail("Got %s on the done queue" % got)
200
201     self.assertEqual(w, 3)
202     self.assertEqual(a, 1)
203
204     self.cond.acquire()
205     self.cond.notifyAll()
206     self.cond.release()
207     self._waitThreads()
208     self.assertEqual(self.done.get_nowait(), "W")
209     self.assertRaises(Queue.Empty, self.done.get_nowait)
210
211   def testBlockingWait(self):
212     def _BlockingWait():
213       self.cond.acquire()
214       self.done.put("A")
215       self.cond.wait()
216       self.cond.release()
217       self.done.put("W")
218
219     self._TestWait(_BlockingWait)
220
221   def testLongTimeoutWait(self):
222     def _Helper():
223       self.cond.acquire()
224       self.done.put("A")
225       self.cond.wait(15.0)
226       self.cond.release()
227       self.done.put("W")
228
229     self._TestWait(_Helper)
230
231   def _TimeoutWait(self, timeout, check):
232     self.cond.acquire()
233     self.cond.wait(timeout)
234     self.cond.release()
235     self.done.put(check)
236
237   def testShortTimeoutWait(self):
238     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
239     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
240     self._waitThreads()
241     self.assertEqual(self.done.get_nowait(), "T1")
242     self.assertEqual(self.done.get_nowait(), "T1")
243     self.assertRaises(Queue.Empty, self.done.get_nowait)
244
245   def testZeroTimeoutWait(self):
246     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
247     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
248     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
249     self._waitThreads()
250     self.assertEqual(self.done.get_nowait(), "T0")
251     self.assertEqual(self.done.get_nowait(), "T0")
252     self.assertEqual(self.done.get_nowait(), "T0")
253     self.assertRaises(Queue.Empty, self.done.get_nowait)
254
255
256 class TestSharedLock(_ThreadedTestCase):
257   """SharedLock tests"""
258
259   def setUp(self):
260     _ThreadedTestCase.setUp(self)
261     self.sl = locking.SharedLock("TestSharedLock")
262
263   def testSequenceAndOwnership(self):
264     self.assertFalse(self.sl._is_owned())
265     self.sl.acquire(shared=1)
266     self.assert_(self.sl._is_owned())
267     self.assert_(self.sl._is_owned(shared=1))
268     self.assertFalse(self.sl._is_owned(shared=0))
269     self.sl.release()
270     self.assertFalse(self.sl._is_owned())
271     self.sl.acquire()
272     self.assert_(self.sl._is_owned())
273     self.assertFalse(self.sl._is_owned(shared=1))
274     self.assert_(self.sl._is_owned(shared=0))
275     self.sl.release()
276     self.assertFalse(self.sl._is_owned())
277     self.sl.acquire(shared=1)
278     self.assert_(self.sl._is_owned())
279     self.assert_(self.sl._is_owned(shared=1))
280     self.assertFalse(self.sl._is_owned(shared=0))
281     self.sl.release()
282     self.assertFalse(self.sl._is_owned())
283
284   def testBooleanValue(self):
285     # semaphores are supposed to return a true value on a successful acquire
286     self.assert_(self.sl.acquire(shared=1))
287     self.sl.release()
288     self.assert_(self.sl.acquire())
289     self.sl.release()
290
291   def testDoubleLockingStoE(self):
292     self.sl.acquire(shared=1)
293     self.assertRaises(AssertionError, self.sl.acquire)
294
295   def testDoubleLockingEtoS(self):
296     self.sl.acquire()
297     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
298
299   def testDoubleLockingStoS(self):
300     self.sl.acquire(shared=1)
301     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
302
303   def testDoubleLockingEtoE(self):
304     self.sl.acquire()
305     self.assertRaises(AssertionError, self.sl.acquire)
306
307   # helper functions: called in a separate thread they acquire the lock, send
308   # their identifier on the done queue, then release it.
309   def _doItSharer(self):
310     try:
311       self.sl.acquire(shared=1)
312       self.done.put('SHR')
313       self.sl.release()
314     except errors.LockError:
315       self.done.put('ERR')
316
317   def _doItExclusive(self):
318     try:
319       self.sl.acquire()
320       self.done.put('EXC')
321       self.sl.release()
322     except errors.LockError:
323       self.done.put('ERR')
324
325   def _doItDelete(self):
326     try:
327       self.sl.delete()
328       self.done.put('DEL')
329     except errors.LockError:
330       self.done.put('ERR')
331
332   def testSharersCanCoexist(self):
333     self.sl.acquire(shared=1)
334     threading.Thread(target=self._doItSharer).start()
335     self.assert_(self.done.get(True, 1))
336     self.sl.release()
337
338   @_Repeat
339   def testExclusiveBlocksExclusive(self):
340     self.sl.acquire()
341     self._addThread(target=self._doItExclusive)
342     self.assertRaises(Queue.Empty, self.done.get_nowait)
343     self.sl.release()
344     self._waitThreads()
345     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
346
347   @_Repeat
348   def testExclusiveBlocksDelete(self):
349     self.sl.acquire()
350     self._addThread(target=self._doItDelete)
351     self.assertRaises(Queue.Empty, self.done.get_nowait)
352     self.sl.release()
353     self._waitThreads()
354     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
355     self.sl = locking.SharedLock(self.sl.name)
356
357   @_Repeat
358   def testExclusiveBlocksSharer(self):
359     self.sl.acquire()
360     self._addThread(target=self._doItSharer)
361     self.assertRaises(Queue.Empty, self.done.get_nowait)
362     self.sl.release()
363     self._waitThreads()
364     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
365
366   @_Repeat
367   def testSharerBlocksExclusive(self):
368     self.sl.acquire(shared=1)
369     self._addThread(target=self._doItExclusive)
370     self.assertRaises(Queue.Empty, self.done.get_nowait)
371     self.sl.release()
372     self._waitThreads()
373     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
374
375   @_Repeat
376   def testSharerBlocksDelete(self):
377     self.sl.acquire(shared=1)
378     self._addThread(target=self._doItDelete)
379     self.assertRaises(Queue.Empty, self.done.get_nowait)
380     self.sl.release()
381     self._waitThreads()
382     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
383     self.sl = locking.SharedLock(self.sl.name)
384
385   @_Repeat
386   def testWaitingExclusiveBlocksSharer(self):
387     """SKIPPED testWaitingExclusiveBlockSharer"""
388     return
389
390     self.sl.acquire(shared=1)
391     # the lock is acquired in shared mode...
392     self._addThread(target=self._doItExclusive)
393     # ...but now an exclusive is waiting...
394     self._addThread(target=self._doItSharer)
395     # ...so the sharer should be blocked as well
396     self.assertRaises(Queue.Empty, self.done.get_nowait)
397     self.sl.release()
398     self._waitThreads()
399     # The exclusive passed before
400     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
401     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
402
403   @_Repeat
404   def testWaitingSharerBlocksExclusive(self):
405     """SKIPPED testWaitingSharerBlocksExclusive"""
406     return
407
408     self.sl.acquire()
409     # the lock is acquired in exclusive mode...
410     self._addThread(target=self._doItSharer)
411     # ...but now a sharer is waiting...
412     self._addThread(target=self._doItExclusive)
413     # ...the exclusive is waiting too...
414     self.assertRaises(Queue.Empty, self.done.get_nowait)
415     self.sl.release()
416     self._waitThreads()
417     # The sharer passed before
418     self.assertEqual(self.done.get_nowait(), 'SHR')
419     self.assertEqual(self.done.get_nowait(), 'EXC')
420
421   def testDelete(self):
422     self.sl.delete()
423     self.assertRaises(errors.LockError, self.sl.acquire)
424     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
425     self.assertRaises(errors.LockError, self.sl.delete)
426
427   def testDeleteTimeout(self):
428     self.sl.delete(timeout=60)
429
430   def testNoDeleteIfSharer(self):
431     self.sl.acquire(shared=1)
432     self.assertRaises(AssertionError, self.sl.delete)
433
434   @_Repeat
435   def testDeletePendingSharersExclusiveDelete(self):
436     self.sl.acquire()
437     self._addThread(target=self._doItSharer)
438     self._addThread(target=self._doItSharer)
439     self._addThread(target=self._doItExclusive)
440     self._addThread(target=self._doItDelete)
441     self.sl.delete()
442     self._waitThreads()
443     # The threads who were pending return ERR
444     for _ in range(4):
445       self.assertEqual(self.done.get_nowait(), 'ERR')
446     self.sl = locking.SharedLock(self.sl.name)
447
448   @_Repeat
449   def testDeletePendingDeleteExclusiveSharers(self):
450     self.sl.acquire()
451     self._addThread(target=self._doItDelete)
452     self._addThread(target=self._doItExclusive)
453     self._addThread(target=self._doItSharer)
454     self._addThread(target=self._doItSharer)
455     self.sl.delete()
456     self._waitThreads()
457     # The two threads who were pending return both ERR
458     self.assertEqual(self.done.get_nowait(), 'ERR')
459     self.assertEqual(self.done.get_nowait(), 'ERR')
460     self.assertEqual(self.done.get_nowait(), 'ERR')
461     self.assertEqual(self.done.get_nowait(), 'ERR')
462     self.sl = locking.SharedLock(self.sl.name)
463
464   @_Repeat
465   def testExclusiveAcquireTimeout(self):
466     for shared in [0, 1]:
467       on_queue = threading.Event()
468       release_exclusive = threading.Event()
469
470       def _LockExclusive():
471         self.sl.acquire(shared=0, test_notify=on_queue.set)
472         self.done.put("A: start wait")
473         release_exclusive.wait()
474         self.done.put("A: end wait")
475         self.sl.release()
476
477       # Start thread to hold lock in exclusive mode
478       self._addThread(target=_LockExclusive)
479
480       # Wait for wait to begin
481       self.assertEqual(self.done.get(timeout=60), "A: start wait")
482
483       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
484       # on the queue
485       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
486                                       test_notify=release_exclusive.set))
487
488       self.done.put("got 2nd")
489       self.sl.release()
490
491       self._waitThreads()
492
493       self.assertEqual(self.done.get_nowait(), "A: end wait")
494       self.assertEqual(self.done.get_nowait(), "got 2nd")
495       self.assertRaises(Queue.Empty, self.done.get_nowait)
496
497   @_Repeat
498   def testAcquireExpiringTimeout(self):
499     def _AcquireWithTimeout(shared, timeout):
500       if not self.sl.acquire(shared=shared, timeout=timeout):
501         self.done.put("timeout")
502
503     for shared in [0, 1]:
504       # Lock exclusively
505       self.sl.acquire()
506
507       # Start shared acquires with timeout between 0 and 20 ms
508       for i in range(11):
509         self._addThread(target=_AcquireWithTimeout,
510                         args=(shared, i * 2.0 / 1000.0))
511
512       # Wait for threads to finish (makes sure the acquire timeout expires
513       # before releasing the lock)
514       self._waitThreads()
515
516       # Release lock
517       self.sl.release()
518
519       for _ in range(11):
520         self.assertEqual(self.done.get_nowait(), "timeout")
521
522       self.assertRaises(Queue.Empty, self.done.get_nowait)
523
524   @_Repeat
525   def testSharedSkipExclusiveAcquires(self):
526     # Tests whether shared acquires jump in front of exclusive acquires in the
527     # queue.
528
529     def _Acquire(shared, name, notify_ev, wait_ev):
530       if notify_ev:
531         notify_fn = notify_ev.set
532       else:
533         notify_fn = None
534
535       if wait_ev:
536         wait_ev.wait()
537
538       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
539         return
540
541       self.done.put(name)
542       self.sl.release()
543
544     # Get exclusive lock while we fill the queue
545     self.sl.acquire()
546
547     shrcnt1 = 5
548     shrcnt2 = 7
549     shrcnt3 = 9
550     shrcnt4 = 2
551
552     # Add acquires using threading.Event for synchronization. They'll be
553     # acquired exactly in the order defined in this list.
554     acquires = (shrcnt1 * [(1, "shared 1")] +
555                 3 * [(0, "exclusive 1")] +
556                 shrcnt2 * [(1, "shared 2")] +
557                 shrcnt3 * [(1, "shared 3")] +
558                 shrcnt4 * [(1, "shared 4")] +
559                 3 * [(0, "exclusive 2")])
560
561     ev_cur = None
562     ev_prev = None
563
564     for args in acquires:
565       ev_cur = threading.Event()
566       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
567       ev_prev = ev_cur
568
569     # Wait for last acquire to start
570     ev_prev.wait()
571
572     # Expect 6 pending exclusive acquires and 1 for all shared acquires
573     # together
574     self.assertEqual(self.sl._count_pending(), 7)
575
576     # Release exclusive lock and wait
577     self.sl.release()
578
579     self._waitThreads()
580
581     # Check sequence
582     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
583       # Shared locks aren't guaranteed to be notified in order, but they'll be
584       # first
585       tmp = self.done.get_nowait()
586       if tmp == "shared 1":
587         shrcnt1 -= 1
588       elif tmp == "shared 2":
589         shrcnt2 -= 1
590       elif tmp == "shared 3":
591         shrcnt3 -= 1
592       elif tmp == "shared 4":
593         shrcnt4 -= 1
594     self.assertEqual(shrcnt1, 0)
595     self.assertEqual(shrcnt2, 0)
596     self.assertEqual(shrcnt3, 0)
597     self.assertEqual(shrcnt3, 0)
598
599     for _ in range(3):
600       self.assertEqual(self.done.get_nowait(), "exclusive 1")
601
602     for _ in range(3):
603       self.assertEqual(self.done.get_nowait(), "exclusive 2")
604
605     self.assertRaises(Queue.Empty, self.done.get_nowait)
606
607   @_Repeat
608   def testMixedAcquireTimeout(self):
609     sync = threading.Event()
610
611     def _AcquireShared(ev):
612       if not self.sl.acquire(shared=1, timeout=None):
613         return
614
615       self.done.put("shared")
616
617       # Notify main thread
618       ev.set()
619
620       # Wait for notification from main thread
621       sync.wait()
622
623       # Release lock
624       self.sl.release()
625
626     acquires = []
627     for _ in range(3):
628       ev = threading.Event()
629       self._addThread(target=_AcquireShared, args=(ev, ))
630       acquires.append(ev)
631
632     # Wait for all acquires to finish
633     for i in acquires:
634       i.wait()
635
636     self.assertEqual(self.sl._count_pending(), 0)
637
638     # Try to get exclusive lock
639     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
640
641     # Acquire exclusive without timeout
642     exclsync = threading.Event()
643     exclev = threading.Event()
644
645     def _AcquireExclusive():
646       if not self.sl.acquire(shared=0):
647         return
648
649       self.done.put("exclusive")
650
651       # Notify main thread
652       exclev.set()
653
654       # Wait for notification from main thread
655       exclsync.wait()
656
657       self.sl.release()
658
659     self._addThread(target=_AcquireExclusive)
660
661     # Try to get exclusive lock
662     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
663
664     # Make all shared holders release their locks
665     sync.set()
666
667     # Wait for exclusive acquire to succeed
668     exclev.wait()
669
670     self.assertEqual(self.sl._count_pending(), 0)
671
672     # Try to get exclusive lock
673     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
674
675     def _AcquireSharedSimple():
676       if self.sl.acquire(shared=1, timeout=None):
677         self.done.put("shared2")
678         self.sl.release()
679
680     for _ in range(10):
681       self._addThread(target=_AcquireSharedSimple)
682
683     # Tell exclusive lock to release
684     exclsync.set()
685
686     # Wait for everything to finish
687     self._waitThreads()
688
689     self.assertEqual(self.sl._count_pending(), 0)
690
691     # Check sequence
692     for _ in range(3):
693       self.assertEqual(self.done.get_nowait(), "shared")
694
695     self.assertEqual(self.done.get_nowait(), "exclusive")
696
697     for _ in range(10):
698       self.assertEqual(self.done.get_nowait(), "shared2")
699
700     self.assertRaises(Queue.Empty, self.done.get_nowait)
701
702
703 class TestSharedLockInCondition(_ThreadedTestCase):
704   """SharedLock as a condition lock tests"""
705
706   def setUp(self):
707     _ThreadedTestCase.setUp(self)
708     self.sl = locking.SharedLock("TestSharedLockInCondition")
709     self.setCondition()
710
711   def setCondition(self):
712     self.cond = threading.Condition(self.sl)
713
714   def testKeepMode(self):
715     self.cond.acquire(shared=1)
716     self.assert_(self.sl._is_owned(shared=1))
717     self.cond.wait(0)
718     self.assert_(self.sl._is_owned(shared=1))
719     self.cond.release()
720     self.cond.acquire(shared=0)
721     self.assert_(self.sl._is_owned(shared=0))
722     self.cond.wait(0)
723     self.assert_(self.sl._is_owned(shared=0))
724     self.cond.release()
725
726
727 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
728   """SharedLock as a pipe condition lock tests"""
729
730   def setCondition(self):
731     self.cond = locking.PipeCondition(self.sl)
732
733
734 class TestSSynchronizedDecorator(_ThreadedTestCase):
735   """Shared Lock Synchronized decorator test"""
736
737   def setUp(self):
738     _ThreadedTestCase.setUp(self)
739
740   @locking.ssynchronized(_decoratorlock)
741   def _doItExclusive(self):
742     self.assert_(_decoratorlock._is_owned())
743     self.done.put('EXC')
744
745   @locking.ssynchronized(_decoratorlock, shared=1)
746   def _doItSharer(self):
747     self.assert_(_decoratorlock._is_owned(shared=1))
748     self.done.put('SHR')
749
750   def testDecoratedFunctions(self):
751     self._doItExclusive()
752     self.assertFalse(_decoratorlock._is_owned())
753     self._doItSharer()
754     self.assertFalse(_decoratorlock._is_owned())
755
756   def testSharersCanCoexist(self):
757     _decoratorlock.acquire(shared=1)
758     threading.Thread(target=self._doItSharer).start()
759     self.assert_(self.done.get(True, 1))
760     _decoratorlock.release()
761
762   @_Repeat
763   def testExclusiveBlocksExclusive(self):
764     _decoratorlock.acquire()
765     self._addThread(target=self._doItExclusive)
766     # give it a bit of time to check that it's not actually doing anything
767     self.assertRaises(Queue.Empty, self.done.get_nowait)
768     _decoratorlock.release()
769     self._waitThreads()
770     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
771
772   @_Repeat
773   def testExclusiveBlocksSharer(self):
774     _decoratorlock.acquire()
775     self._addThread(target=self._doItSharer)
776     self.assertRaises(Queue.Empty, self.done.get_nowait)
777     _decoratorlock.release()
778     self._waitThreads()
779     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
780
781   @_Repeat
782   def testSharerBlocksExclusive(self):
783     _decoratorlock.acquire(shared=1)
784     self._addThread(target=self._doItExclusive)
785     self.assertRaises(Queue.Empty, self.done.get_nowait)
786     _decoratorlock.release()
787     self._waitThreads()
788     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
789
790
791 class TestLockSet(_ThreadedTestCase):
792   """LockSet tests"""
793
794   def setUp(self):
795     _ThreadedTestCase.setUp(self)
796     self._setUpLS()
797
798   def _setUpLS(self):
799     """Helper to (re)initialize the lock set"""
800     self.resources = ['one', 'two', 'three']
801     self.ls = locking.LockSet(self.resources, "TestLockSet")
802
803   def testResources(self):
804     self.assertEquals(self.ls._names(), set(self.resources))
805     newls = locking.LockSet([], "TestLockSet.testResources")
806     self.assertEquals(newls._names(), set())
807
808   def testAcquireRelease(self):
809     self.assert_(self.ls.acquire('one'))
810     self.assertEquals(self.ls._list_owned(), set(['one']))
811     self.ls.release()
812     self.assertEquals(self.ls._list_owned(), set())
813     self.assertEquals(self.ls.acquire(['one']), set(['one']))
814     self.assertEquals(self.ls._list_owned(), set(['one']))
815     self.ls.release()
816     self.assertEquals(self.ls._list_owned(), set())
817     self.ls.acquire(['one', 'two', 'three'])
818     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
819     self.ls.release('one')
820     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
821     self.ls.release(['three'])
822     self.assertEquals(self.ls._list_owned(), set(['two']))
823     self.ls.release()
824     self.assertEquals(self.ls._list_owned(), set())
825     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
826     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
827     self.ls.release()
828     self.assertEquals(self.ls._list_owned(), set())
829
830   def testNoDoubleAcquire(self):
831     self.ls.acquire('one')
832     self.assertRaises(AssertionError, self.ls.acquire, 'one')
833     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
834     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
835     self.ls.release()
836     self.ls.acquire(['one', 'three'])
837     self.ls.release('one')
838     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
839     self.ls.release('three')
840
841   def testNoWrongRelease(self):
842     self.assertRaises(AssertionError, self.ls.release)
843     self.ls.acquire('one')
844     self.assertRaises(AssertionError, self.ls.release, 'two')
845
846   def testAddRemove(self):
847     self.ls.add('four')
848     self.assertEquals(self.ls._list_owned(), set())
849     self.assert_('four' in self.ls._names())
850     self.ls.add(['five', 'six', 'seven'], acquired=1)
851     self.assert_('five' in self.ls._names())
852     self.assert_('six' in self.ls._names())
853     self.assert_('seven' in self.ls._names())
854     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
855     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
856     self.assert_('five' not in self.ls._names())
857     self.assert_('six' not in self.ls._names())
858     self.assertEquals(self.ls._list_owned(), set(['seven']))
859     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
860     self.ls.remove('seven')
861     self.assert_('seven' not in self.ls._names())
862     self.assertEquals(self.ls._list_owned(), set([]))
863     self.ls.acquire(None, shared=1)
864     self.assertRaises(AssertionError, self.ls.add, 'eight')
865     self.ls.release()
866     self.ls.acquire(None)
867     self.ls.add('eight', acquired=1)
868     self.assert_('eight' in self.ls._names())
869     self.assert_('eight' in self.ls._list_owned())
870     self.ls.add('nine')
871     self.assert_('nine' in self.ls._names())
872     self.assert_('nine' not in self.ls._list_owned())
873     self.ls.release()
874     self.ls.remove(['two'])
875     self.assert_('two' not in self.ls._names())
876     self.ls.acquire('three')
877     self.assertEquals(self.ls.remove(['three']), ['three'])
878     self.assert_('three' not in self.ls._names())
879     self.assertEquals(self.ls.remove('three'), [])
880     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
881     self.assert_('one' not in self.ls._names())
882
883   def testRemoveNonBlocking(self):
884     self.ls.acquire('one')
885     self.assertEquals(self.ls.remove('one'), ['one'])
886     self.ls.acquire(['two', 'three'])
887     self.assertEquals(self.ls.remove(['two', 'three']),
888                       ['two', 'three'])
889
890   def testNoDoubleAdd(self):
891     self.assertRaises(errors.LockError, self.ls.add, 'two')
892     self.ls.add('four')
893     self.assertRaises(errors.LockError, self.ls.add, 'four')
894
895   def testNoWrongRemoves(self):
896     self.ls.acquire(['one', 'three'], shared=1)
897     # Cannot remove 'two' while holding something which is not a superset
898     self.assertRaises(AssertionError, self.ls.remove, 'two')
899     # Cannot remove 'three' as we are sharing it
900     self.assertRaises(AssertionError, self.ls.remove, 'three')
901
902   def testAcquireSetLock(self):
903     # acquire the set-lock exclusively
904     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
905     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
906     self.assertEquals(self.ls._is_owned(), True)
907     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
908     # I can still add/remove elements...
909     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
910     self.assert_(self.ls.add('six'))
911     self.ls.release()
912     # share the set-lock
913     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
914     # adding new elements is not possible
915     self.assertRaises(AssertionError, self.ls.add, 'five')
916     self.ls.release()
917
918   def testAcquireWithRepetitions(self):
919     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
920                       set(['two', 'two', 'three']))
921     self.ls.release(['two', 'two'])
922     self.assertEquals(self.ls._list_owned(), set(['three']))
923
924   def testEmptyAcquire(self):
925     # Acquire an empty list of locks...
926     self.assertEquals(self.ls.acquire([]), set())
927     self.assertEquals(self.ls._list_owned(), set())
928     # New locks can still be addded
929     self.assert_(self.ls.add('six'))
930     # "re-acquiring" is not an issue, since we had really acquired nothing
931     self.assertEquals(self.ls.acquire([], shared=1), set())
932     self.assertEquals(self.ls._list_owned(), set())
933     # We haven't really acquired anything, so we cannot release
934     self.assertRaises(AssertionError, self.ls.release)
935
936   def _doLockSet(self, names, shared):
937     try:
938       self.ls.acquire(names, shared=shared)
939       self.done.put('DONE')
940       self.ls.release()
941     except errors.LockError:
942       self.done.put('ERR')
943
944   def _doAddSet(self, names):
945     try:
946       self.ls.add(names, acquired=1)
947       self.done.put('DONE')
948       self.ls.release()
949     except errors.LockError:
950       self.done.put('ERR')
951
952   def _doRemoveSet(self, names):
953     self.done.put(self.ls.remove(names))
954
955   @_Repeat
956   def testConcurrentSharedAcquire(self):
957     self.ls.acquire(['one', 'two'], shared=1)
958     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
959     self._waitThreads()
960     self.assertEqual(self.done.get_nowait(), 'DONE')
961     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
962     self._waitThreads()
963     self.assertEqual(self.done.get_nowait(), 'DONE')
964     self._addThread(target=self._doLockSet, args=('three', 1))
965     self._waitThreads()
966     self.assertEqual(self.done.get_nowait(), 'DONE')
967     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
968     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
969     self.assertRaises(Queue.Empty, self.done.get_nowait)
970     self.ls.release()
971     self._waitThreads()
972     self.assertEqual(self.done.get_nowait(), 'DONE')
973     self.assertEqual(self.done.get_nowait(), 'DONE')
974
975   @_Repeat
976   def testConcurrentExclusiveAcquire(self):
977     self.ls.acquire(['one', 'two'])
978     self._addThread(target=self._doLockSet, args=('three', 1))
979     self._waitThreads()
980     self.assertEqual(self.done.get_nowait(), 'DONE')
981     self._addThread(target=self._doLockSet, args=('three', 0))
982     self._waitThreads()
983     self.assertEqual(self.done.get_nowait(), 'DONE')
984     self.assertRaises(Queue.Empty, self.done.get_nowait)
985     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
986     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
987     self._addThread(target=self._doLockSet, args=('one', 0))
988     self._addThread(target=self._doLockSet, args=('one', 1))
989     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
990     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
991     self.assertRaises(Queue.Empty, self.done.get_nowait)
992     self.ls.release()
993     self._waitThreads()
994     for _ in range(6):
995       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
996
997   @_Repeat
998   def testSimpleAcquireTimeoutExpiring(self):
999     names = sorted(self.ls._names())
1000     self.assert_(len(names) >= 3)
1001
1002     # Get name of first lock
1003     first = names[0]
1004
1005     # Get name of last lock
1006     last = names.pop()
1007
1008     checks = [
1009       # Block first and try to lock it again
1010       (first, first),
1011
1012       # Block last and try to lock all locks
1013       (None, first),
1014
1015       # Block last and try to lock it again
1016       (last, last),
1017       ]
1018
1019     for (wanted, block) in checks:
1020       # Lock in exclusive mode
1021       self.assert_(self.ls.acquire(block, shared=0))
1022
1023       def _AcquireOne():
1024         # Try to get the same lock again with a timeout (should never succeed)
1025         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1026         if acquired:
1027           self.done.put("acquired")
1028           self.ls.release()
1029         else:
1030           self.assert_(acquired is None)
1031           self.assertFalse(self.ls._list_owned())
1032           self.assertFalse(self.ls._is_owned())
1033           self.done.put("not acquired")
1034
1035       self._addThread(target=_AcquireOne)
1036
1037       # Wait for timeout in thread to expire
1038       self._waitThreads()
1039
1040       # Release exclusive lock again
1041       self.ls.release()
1042
1043       self.assertEqual(self.done.get_nowait(), "not acquired")
1044       self.assertRaises(Queue.Empty, self.done.get_nowait)
1045
1046   @_Repeat
1047   def testDelayedAndExpiringLockAcquire(self):
1048     self._setUpLS()
1049     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1050
1051     for expire in (False, True):
1052       names = sorted(self.ls._names())
1053       self.assertEqual(len(names), 8)
1054
1055       lock_ev = dict([(i, threading.Event()) for i in names])
1056
1057       # Lock all in exclusive mode
1058       self.assert_(self.ls.acquire(names, shared=0))
1059
1060       if expire:
1061         # We'll wait at least 300ms per lock
1062         lockwait = len(names) * [0.3]
1063
1064         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1065         # this gives us up to 2.4s to fail.
1066         lockall_timeout = 0.4
1067       else:
1068         # This should finish rather quickly
1069         lockwait = None
1070         lockall_timeout = len(names) * 5.0
1071
1072       def _LockAll():
1073         def acquire_notification(name):
1074           if not expire:
1075             self.done.put("getting %s" % name)
1076
1077           # Kick next lock
1078           lock_ev[name].set()
1079
1080         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1081                            test_notify=acquire_notification):
1082           self.done.put("got all")
1083           self.ls.release()
1084         else:
1085           self.done.put("timeout on all")
1086
1087         # Notify all locks
1088         for ev in lock_ev.values():
1089           ev.set()
1090
1091       t = self._addThread(target=_LockAll)
1092
1093       for idx, name in enumerate(names):
1094         # Wait for actual acquire on this lock to start
1095         lock_ev[name].wait(10.0)
1096
1097         if expire and t.isAlive():
1098           # Wait some time after getting the notification to make sure the lock
1099           # acquire will expire
1100           SafeSleep(lockwait[idx])
1101
1102         self.ls.release(names=name)
1103
1104       self.assertFalse(self.ls._list_owned())
1105
1106       self._waitThreads()
1107
1108       if expire:
1109         # Not checking which locks were actually acquired. Doing so would be
1110         # too timing-dependant.
1111         self.assertEqual(self.done.get_nowait(), "timeout on all")
1112       else:
1113         for i in names:
1114           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1115         self.assertEqual(self.done.get_nowait(), "got all")
1116       self.assertRaises(Queue.Empty, self.done.get_nowait)
1117
1118   @_Repeat
1119   def testConcurrentRemove(self):
1120     self.ls.add('four')
1121     self.ls.acquire(['one', 'two', 'four'])
1122     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1123     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1124     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1125     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1126     self.assertRaises(Queue.Empty, self.done.get_nowait)
1127     self.ls.remove('one')
1128     self.ls.release()
1129     self._waitThreads()
1130     for i in range(4):
1131       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1132     self.ls.add(['five', 'six'], acquired=1)
1133     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1134     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1135     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1136     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1137     self.ls.remove('five')
1138     self.ls.release()
1139     self._waitThreads()
1140     for i in range(4):
1141       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1142     self.ls.acquire(['three', 'four'])
1143     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1144     self.assertRaises(Queue.Empty, self.done.get_nowait)
1145     self.ls.remove('four')
1146     self._waitThreads()
1147     self.assertEqual(self.done.get_nowait(), ['six'])
1148     self._addThread(target=self._doRemoveSet, args=(['two']))
1149     self._waitThreads()
1150     self.assertEqual(self.done.get_nowait(), ['two'])
1151     self.ls.release()
1152     # reset lockset
1153     self._setUpLS()
1154
1155   @_Repeat
1156   def testConcurrentSharedSetLock(self):
1157     # share the set-lock...
1158     self.ls.acquire(None, shared=1)
1159     # ...another thread can share it too
1160     self._addThread(target=self._doLockSet, args=(None, 1))
1161     self._waitThreads()
1162     self.assertEqual(self.done.get_nowait(), 'DONE')
1163     # ...or just share some elements
1164     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1165     self._waitThreads()
1166     self.assertEqual(self.done.get_nowait(), 'DONE')
1167     # ...but not add new ones or remove any
1168     t = self._addThread(target=self._doAddSet, args=(['nine']))
1169     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1170     self.assertRaises(Queue.Empty, self.done.get_nowait)
1171     # this just releases the set-lock
1172     self.ls.release([])
1173     t.join(60)
1174     self.assertEqual(self.done.get_nowait(), 'DONE')
1175     # release the lock on the actual elements so remove() can proceed too
1176     self.ls.release()
1177     self._waitThreads()
1178     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1179     # reset lockset
1180     self._setUpLS()
1181
1182   @_Repeat
1183   def testConcurrentExclusiveSetLock(self):
1184     # acquire the set-lock...
1185     self.ls.acquire(None, shared=0)
1186     # ...no one can do anything else
1187     self._addThread(target=self._doLockSet, args=(None, 1))
1188     self._addThread(target=self._doLockSet, args=(None, 0))
1189     self._addThread(target=self._doLockSet, args=(['three'], 0))
1190     self._addThread(target=self._doLockSet, args=(['two'], 1))
1191     self._addThread(target=self._doAddSet, args=(['nine']))
1192     self.assertRaises(Queue.Empty, self.done.get_nowait)
1193     self.ls.release()
1194     self._waitThreads()
1195     for _ in range(5):
1196       self.assertEqual(self.done.get(True, 1), 'DONE')
1197     # cleanup
1198     self._setUpLS()
1199
1200   @_Repeat
1201   def testConcurrentSetLockAdd(self):
1202     self.ls.acquire('one')
1203     # Another thread wants the whole SetLock
1204     self._addThread(target=self._doLockSet, args=(None, 0))
1205     self._addThread(target=self._doLockSet, args=(None, 1))
1206     self.assertRaises(Queue.Empty, self.done.get_nowait)
1207     self.assertRaises(AssertionError, self.ls.add, 'four')
1208     self.ls.release()
1209     self._waitThreads()
1210     self.assertEqual(self.done.get_nowait(), 'DONE')
1211     self.assertEqual(self.done.get_nowait(), 'DONE')
1212     self.ls.acquire(None)
1213     self._addThread(target=self._doLockSet, args=(None, 0))
1214     self._addThread(target=self._doLockSet, args=(None, 1))
1215     self.assertRaises(Queue.Empty, self.done.get_nowait)
1216     self.ls.add('four')
1217     self.ls.add('five', acquired=1)
1218     self.ls.add('six', acquired=1, shared=1)
1219     self.assertEquals(self.ls._list_owned(),
1220       set(['one', 'two', 'three', 'five', 'six']))
1221     self.assertEquals(self.ls._is_owned(), True)
1222     self.assertEquals(self.ls._names(),
1223       set(['one', 'two', 'three', 'four', 'five', 'six']))
1224     self.ls.release()
1225     self._waitThreads()
1226     self.assertEqual(self.done.get_nowait(), 'DONE')
1227     self.assertEqual(self.done.get_nowait(), 'DONE')
1228     self._setUpLS()
1229
1230   @_Repeat
1231   def testEmptyLockSet(self):
1232     # get the set-lock
1233     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1234     # now empty it...
1235     self.ls.remove(['one', 'two', 'three'])
1236     # and adds/locks by another thread still wait
1237     self._addThread(target=self._doAddSet, args=(['nine']))
1238     self._addThread(target=self._doLockSet, args=(None, 1))
1239     self._addThread(target=self._doLockSet, args=(None, 0))
1240     self.assertRaises(Queue.Empty, self.done.get_nowait)
1241     self.ls.release()
1242     self._waitThreads()
1243     for _ in range(3):
1244       self.assertEqual(self.done.get_nowait(), 'DONE')
1245     # empty it again...
1246     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1247     # now share it...
1248     self.assertEqual(self.ls.acquire(None, shared=1), set())
1249     # other sharers can go, adds still wait
1250     self._addThread(target=self._doLockSet, args=(None, 1))
1251     self._waitThreads()
1252     self.assertEqual(self.done.get_nowait(), 'DONE')
1253     self._addThread(target=self._doAddSet, args=(['nine']))
1254     self.assertRaises(Queue.Empty, self.done.get_nowait)
1255     self.ls.release()
1256     self._waitThreads()
1257     self.assertEqual(self.done.get_nowait(), 'DONE')
1258     self._setUpLS()
1259
1260
1261 class TestGanetiLockManager(_ThreadedTestCase):
1262
1263   def setUp(self):
1264     _ThreadedTestCase.setUp(self)
1265     self.nodes=['n1', 'n2']
1266     self.instances=['i1', 'i2', 'i3']
1267     self.GL = locking.GanetiLockManager(nodes=self.nodes,
1268                                         instances=self.instances)
1269
1270   def tearDown(self):
1271     # Don't try this at home...
1272     locking.GanetiLockManager._instance = None
1273
1274   def testLockingConstants(self):
1275     # The locking library internally cheats by assuming its constants have some
1276     # relationships with each other. Check those hold true.
1277     # This relationship is also used in the Processor to recursively acquire
1278     # the right locks. Again, please don't break it.
1279     for i in range(len(locking.LEVELS)):
1280       self.assertEqual(i, locking.LEVELS[i])
1281
1282   def testDoubleGLFails(self):
1283     self.assertRaises(AssertionError, locking.GanetiLockManager)
1284
1285   def testLockNames(self):
1286     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1287     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1288     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1289                      set(self.instances))
1290
1291   def testInitAndResources(self):
1292     locking.GanetiLockManager._instance = None
1293     self.GL = locking.GanetiLockManager([], [])
1294     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1295     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1296     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1297
1298     locking.GanetiLockManager._instance = None
1299     self.GL = locking.GanetiLockManager(self.nodes, [])
1300     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1301     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1302     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1303
1304     locking.GanetiLockManager._instance = None
1305     self.GL = locking.GanetiLockManager([], self.instances)
1306     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1307     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1308     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1309                      set(self.instances))
1310
1311   def testAcquireRelease(self):
1312     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1313     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1314     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1315     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1316     self.GL.release(locking.LEVEL_NODE, ['n2'])
1317     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1318     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1319     self.GL.release(locking.LEVEL_NODE)
1320     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1321     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1322     self.GL.release(locking.LEVEL_INSTANCE)
1323     self.assertRaises(errors.LockError, self.GL.acquire,
1324                       locking.LEVEL_INSTANCE, ['i5'])
1325     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1326     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1327
1328   def testAcquireWholeSets(self):
1329     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1330     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1331                       set(self.instances))
1332     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1333                       set(self.instances))
1334     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1335                       set(self.nodes))
1336     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1337                       set(self.nodes))
1338     self.GL.release(locking.LEVEL_NODE)
1339     self.GL.release(locking.LEVEL_INSTANCE)
1340     self.GL.release(locking.LEVEL_CLUSTER)
1341
1342   def testAcquireWholeAndPartial(self):
1343     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1344     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1345                       set(self.instances))
1346     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1347                       set(self.instances))
1348     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1349                       set(['n2']))
1350     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1351                       set(['n2']))
1352     self.GL.release(locking.LEVEL_NODE)
1353     self.GL.release(locking.LEVEL_INSTANCE)
1354     self.GL.release(locking.LEVEL_CLUSTER)
1355
1356   def testBGLDependency(self):
1357     self.assertRaises(AssertionError, self.GL.acquire,
1358                       locking.LEVEL_NODE, ['n1', 'n2'])
1359     self.assertRaises(AssertionError, self.GL.acquire,
1360                       locking.LEVEL_INSTANCE, ['i3'])
1361     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1362     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1363     self.assertRaises(AssertionError, self.GL.release,
1364                       locking.LEVEL_CLUSTER, ['BGL'])
1365     self.assertRaises(AssertionError, self.GL.release,
1366                       locking.LEVEL_CLUSTER)
1367     self.GL.release(locking.LEVEL_NODE)
1368     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1369     self.assertRaises(AssertionError, self.GL.release,
1370                       locking.LEVEL_CLUSTER, ['BGL'])
1371     self.assertRaises(AssertionError, self.GL.release,
1372                       locking.LEVEL_CLUSTER)
1373     self.GL.release(locking.LEVEL_INSTANCE)
1374
1375   def testWrongOrder(self):
1376     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1377     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1378     self.assertRaises(AssertionError, self.GL.acquire,
1379                       locking.LEVEL_NODE, ['n1'])
1380     self.assertRaises(AssertionError, self.GL.acquire,
1381                       locking.LEVEL_INSTANCE, ['i2'])
1382
1383   # Helper function to run as a thread that shared the BGL and then acquires
1384   # some locks at another level.
1385   def _doLock(self, level, names, shared):
1386     try:
1387       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1388       self.GL.acquire(level, names, shared=shared)
1389       self.done.put('DONE')
1390       self.GL.release(level)
1391       self.GL.release(locking.LEVEL_CLUSTER)
1392     except errors.LockError:
1393       self.done.put('ERR')
1394
1395   @_Repeat
1396   def testConcurrency(self):
1397     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1398     self._addThread(target=self._doLock,
1399                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1400     self._waitThreads()
1401     self.assertEqual(self.done.get_nowait(), 'DONE')
1402     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1403     self._addThread(target=self._doLock,
1404                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1405     self._waitThreads()
1406     self.assertEqual(self.done.get_nowait(), 'DONE')
1407     self._addThread(target=self._doLock,
1408                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1409     self.assertRaises(Queue.Empty, self.done.get_nowait)
1410     self.GL.release(locking.LEVEL_INSTANCE)
1411     self._waitThreads()
1412     self.assertEqual(self.done.get_nowait(), 'DONE')
1413     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1414     self._addThread(target=self._doLock,
1415                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1416     self._waitThreads()
1417     self.assertEqual(self.done.get_nowait(), 'DONE')
1418     self._addThread(target=self._doLock,
1419                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1420     self.assertRaises(Queue.Empty, self.done.get_nowait)
1421     self.GL.release(locking.LEVEL_INSTANCE)
1422     self._waitThreads()
1423     self.assertEqual(self.done.get(True, 1), 'DONE')
1424     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1425
1426
1427 class TestLockMonitor(_ThreadedTestCase):
1428   def setUp(self):
1429     _ThreadedTestCase.setUp(self)
1430     self.lm = locking.LockMonitor()
1431
1432   def testSingleThread(self):
1433     locks = []
1434
1435     for i in range(100):
1436       name = "TestLock%s" % i
1437       locks.append(locking.SharedLock(name, monitor=self.lm))
1438
1439     self.assertEqual(len(self.lm._locks), len(locks))
1440
1441     # Delete all locks
1442     del locks[:]
1443
1444     # The garbage collector might needs some time
1445     def _CheckLocks():
1446       if self.lm._locks:
1447         raise utils.RetryAgain()
1448
1449     utils.Retry(_CheckLocks, 0.1, 30.0)
1450
1451     self.assertFalse(self.lm._locks)
1452
1453   def testMultiThread(self):
1454     locks = []
1455
1456     def _CreateLock(prev, next, name):
1457       prev.wait()
1458       locks.append(locking.SharedLock(name, monitor=self.lm))
1459       if next:
1460         next.set()
1461
1462     expnames = []
1463
1464     first = threading.Event()
1465     prev = first
1466
1467     # Use a deterministic random generator
1468     for i in random.Random(4263).sample(range(100), 33):
1469       name = "MtTestLock%s" % i
1470       expnames.append(name)
1471
1472       ev = threading.Event()
1473       self._addThread(target=_CreateLock, args=(prev, ev, name))
1474       prev = ev
1475
1476     # Add locks
1477     first.set()
1478     self._waitThreads()
1479
1480     # Check order in which locks were added
1481     self.assertEqual([i.name for i in locks], expnames)
1482
1483     # Sync queries are not supported
1484     self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
1485
1486     # Check query result
1487     self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1488                      [[name, None, None] for name in utils.NiceSort(expnames)])
1489
1490     # Test exclusive acquire
1491     for tlock in locks[::4]:
1492       tlock.acquire(shared=0)
1493       try:
1494         def _GetExpResult(name):
1495           if tlock.name == name:
1496             return [name, "exclusive", [threading.currentThread().getName()]]
1497           return [name, None, None]
1498
1499         self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1500                          [_GetExpResult(name)
1501                           for name in utils.NiceSort(expnames)])
1502       finally:
1503         tlock.release()
1504
1505     # Test shared acquire
1506     def _Acquire(lock, shared, ev):
1507       lock.acquire(shared=shared)
1508       try:
1509         ev.wait()
1510       finally:
1511         lock.release()
1512
1513     for tlock1 in locks[::11]:
1514       for tlock2 in locks[::-15]:
1515         if tlock2 == tlock1:
1516           continue
1517
1518         for tlock3 in locks[::10]:
1519           if tlock3 == tlock2:
1520             continue
1521
1522           ev = threading.Event()
1523
1524           # Acquire locks
1525           tthreads1 = []
1526           for i in range(3):
1527             tthreads1.append(self._addThread(target=_Acquire,
1528                                              args=(tlock1, 1, ev)))
1529           tthread2 = self._addThread(target=_Acquire, args=(tlock2, 1, ev))
1530           tthread3 = self._addThread(target=_Acquire, args=(tlock3, 0, ev))
1531
1532           # Check query result
1533           for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
1534                                                          "owner"], False):
1535             if name == tlock1.name:
1536               self.assertEqual(mode, "shared")
1537               self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
1538               continue
1539
1540             if name == tlock2.name:
1541               self.assertEqual(mode, "shared")
1542               self.assertEqual(owner, [tthread2.getName()])
1543               continue
1544
1545             if name == tlock3.name:
1546               self.assertEqual(mode, "exclusive")
1547               self.assertEqual(owner, [tthread3.getName()])
1548               continue
1549
1550             self.assert_(name in expnames)
1551             self.assert_(mode is None)
1552             self.assert_(owner is None)
1553
1554           # Release locks again
1555           ev.set()
1556
1557           self._waitThreads()
1558
1559           self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1560                            [[name, None, None]
1561                             for name in utils.NiceSort(expnames)])
1562
1563   def testDelete(self):
1564     lock = locking.SharedLock("TestLock", monitor=self.lm)
1565
1566     self.assertEqual(len(self.lm._locks), 1)
1567     self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1568                      [[lock.name, None, None]])
1569
1570     lock.delete()
1571
1572     self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1573                      [[lock.name, "deleted", None]])
1574     self.assertEqual(len(self.lm._locks), 1)
1575
1576
1577 if __name__ == '__main__':
1578   testutils.GanetiTestProgram()