Watcher: fix some doc typos
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30
31 from ganeti import locking
32 from ganeti import errors
33
34 import testutils
35
36
37 # This is used to test the ssynchronize decorator.
38 # Since it's passed as input to a decorator it must be declared as a global.
39 _decoratorlock = locking.SharedLock()
40
41 #: List for looping tests
42 ITERATIONS = range(8)
43
44
45 def _Repeat(fn):
46   """Decorator for executing a function many times"""
47   def wrapper(*args, **kwargs):
48     for i in ITERATIONS:
49       fn(*args, **kwargs)
50   return wrapper
51
52
53 def SafeSleep(duration):
54   start = time.time()
55   while True:
56     delay = start + duration - time.time()
57     if delay <= 0.0:
58       break
59     time.sleep(delay)
60
61
62 class _ThreadedTestCase(unittest.TestCase):
63   """Test class that supports adding/waiting on threads"""
64   def setUp(self):
65     unittest.TestCase.setUp(self)
66     self.done = Queue.Queue(0)
67     self.threads = []
68
69   def _addThread(self, *args, **kwargs):
70     """Create and remember a new thread"""
71     t = threading.Thread(*args, **kwargs)
72     self.threads.append(t)
73     t.start()
74     return t
75
76   def _waitThreads(self):
77     """Wait for all our threads to finish"""
78     for t in self.threads:
79       t.join(60)
80       self.failIf(t.isAlive())
81     self.threads = []
82
83
84 class _ConditionTestCase(_ThreadedTestCase):
85   """Common test case for conditions"""
86
87   def setUp(self, cls):
88     _ThreadedTestCase.setUp(self)
89     self.lock = threading.Lock()
90     self.cond = cls(self.lock)
91
92   def _testAcquireRelease(self):
93     self.assert_(not self.cond._is_owned())
94     self.assertRaises(RuntimeError, self.cond.wait)
95     self.assertRaises(RuntimeError, self.cond.notifyAll)
96
97     self.cond.acquire()
98     self.assert_(self.cond._is_owned())
99     self.cond.notifyAll()
100     self.assert_(self.cond._is_owned())
101     self.cond.release()
102
103     self.assert_(not self.cond._is_owned())
104     self.assertRaises(RuntimeError, self.cond.wait)
105     self.assertRaises(RuntimeError, self.cond.notifyAll)
106
107   def _testNotification(self):
108     def _NotifyAll():
109       self.done.put("NE")
110       self.cond.acquire()
111       self.done.put("NA")
112       self.cond.notifyAll()
113       self.done.put("NN")
114       self.cond.release()
115
116     self.cond.acquire()
117     self._addThread(target=_NotifyAll)
118     self.assertEqual(self.done.get(True, 1), "NE")
119     self.assertRaises(Queue.Empty, self.done.get_nowait)
120     self.cond.wait()
121     self.assertEqual(self.done.get(True, 1), "NA")
122     self.assertEqual(self.done.get(True, 1), "NN")
123     self.assert_(self.cond._is_owned())
124     self.cond.release()
125     self.assert_(not self.cond._is_owned())
126
127
128 class TestSingleNotifyPipeCondition(_ConditionTestCase):
129   """SingleNotifyPipeCondition tests"""
130
131   def setUp(self):
132     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
133
134   def testAcquireRelease(self):
135     self._testAcquireRelease()
136
137   def testNotification(self):
138     self._testNotification()
139
140   def testWaitReuse(self):
141     self.cond.acquire()
142     self.cond.wait(0)
143     self.cond.wait(0.1)
144     self.cond.release()
145
146   def testNoNotifyReuse(self):
147     self.cond.acquire()
148     self.cond.notifyAll()
149     self.assertRaises(RuntimeError, self.cond.wait)
150     self.assertRaises(RuntimeError, self.cond.notifyAll)
151     self.cond.release()
152
153
154 class TestPipeCondition(_ConditionTestCase):
155   """PipeCondition tests"""
156
157   def setUp(self):
158     _ConditionTestCase.setUp(self, locking.PipeCondition)
159
160   def testAcquireRelease(self):
161     self._testAcquireRelease()
162
163   def testNotification(self):
164     self._testNotification()
165
166   def _TestWait(self, fn):
167     self._addThread(target=fn)
168     self._addThread(target=fn)
169     self._addThread(target=fn)
170
171     # Wait for threads to be waiting
172     self.assertEqual(self.done.get(True, 1), "A")
173     self.assertEqual(self.done.get(True, 1), "A")
174     self.assertEqual(self.done.get(True, 1), "A")
175
176     self.assertRaises(Queue.Empty, self.done.get_nowait)
177
178     self.cond.acquire()
179     self.assertEqual(self.cond._nwaiters, 3)
180     # This new thread can"t acquire the lock, and thus call wait, before we
181     # release it
182     self._addThread(target=fn)
183     self.cond.notifyAll()
184     self.assertRaises(Queue.Empty, self.done.get_nowait)
185     self.cond.release()
186
187     # We should now get 3 W and 1 A (for the new thread) in whatever order
188     w = 0
189     a = 0
190     for i in range(4):
191       got = self.done.get(True, 1)
192       if got == "W":
193         w += 1
194       elif got == "A":
195         a += 1
196       else:
197         self.fail("Got %s on the done queue" % got)
198
199     self.assertEqual(w, 3)
200     self.assertEqual(a, 1)
201
202     self.cond.acquire()
203     self.cond.notifyAll()
204     self.cond.release()
205     self._waitThreads()
206     self.assertEqual(self.done.get_nowait(), "W")
207     self.assertRaises(Queue.Empty, self.done.get_nowait)
208
209   def testBlockingWait(self):
210     def _BlockingWait():
211       self.cond.acquire()
212       self.done.put("A")
213       self.cond.wait()
214       self.cond.release()
215       self.done.put("W")
216
217     self._TestWait(_BlockingWait)
218
219   def testLongTimeoutWait(self):
220     def _Helper():
221       self.cond.acquire()
222       self.done.put("A")
223       self.cond.wait(15.0)
224       self.cond.release()
225       self.done.put("W")
226
227     self._TestWait(_Helper)
228
229   def _TimeoutWait(self, timeout, check):
230     self.cond.acquire()
231     self.cond.wait(timeout)
232     self.cond.release()
233     self.done.put(check)
234
235   def testShortTimeoutWait(self):
236     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
237     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
238     self._waitThreads()
239     self.assertEqual(self.done.get_nowait(), "T1")
240     self.assertEqual(self.done.get_nowait(), "T1")
241     self.assertRaises(Queue.Empty, self.done.get_nowait)
242
243   def testZeroTimeoutWait(self):
244     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
245     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
246     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
247     self._waitThreads()
248     self.assertEqual(self.done.get_nowait(), "T0")
249     self.assertEqual(self.done.get_nowait(), "T0")
250     self.assertEqual(self.done.get_nowait(), "T0")
251     self.assertRaises(Queue.Empty, self.done.get_nowait)
252
253
254 class TestSharedLock(_ThreadedTestCase):
255   """SharedLock tests"""
256
257   def setUp(self):
258     _ThreadedTestCase.setUp(self)
259     self.sl = locking.SharedLock()
260
261   def testSequenceAndOwnership(self):
262     self.assert_(not self.sl._is_owned())
263     self.sl.acquire(shared=1)
264     self.assert_(self.sl._is_owned())
265     self.assert_(self.sl._is_owned(shared=1))
266     self.assert_(not self.sl._is_owned(shared=0))
267     self.sl.release()
268     self.assert_(not self.sl._is_owned())
269     self.sl.acquire()
270     self.assert_(self.sl._is_owned())
271     self.assert_(not self.sl._is_owned(shared=1))
272     self.assert_(self.sl._is_owned(shared=0))
273     self.sl.release()
274     self.assert_(not self.sl._is_owned())
275     self.sl.acquire(shared=1)
276     self.assert_(self.sl._is_owned())
277     self.assert_(self.sl._is_owned(shared=1))
278     self.assert_(not self.sl._is_owned(shared=0))
279     self.sl.release()
280     self.assert_(not self.sl._is_owned())
281
282   def testBooleanValue(self):
283     # semaphores are supposed to return a true value on a successful acquire
284     self.assert_(self.sl.acquire(shared=1))
285     self.sl.release()
286     self.assert_(self.sl.acquire())
287     self.sl.release()
288
289   def testDoubleLockingStoE(self):
290     self.sl.acquire(shared=1)
291     self.assertRaises(AssertionError, self.sl.acquire)
292
293   def testDoubleLockingEtoS(self):
294     self.sl.acquire()
295     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
296
297   def testDoubleLockingStoS(self):
298     self.sl.acquire(shared=1)
299     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
300
301   def testDoubleLockingEtoE(self):
302     self.sl.acquire()
303     self.assertRaises(AssertionError, self.sl.acquire)
304
305   # helper functions: called in a separate thread they acquire the lock, send
306   # their identifier on the done queue, then release it.
307   def _doItSharer(self):
308     try:
309       self.sl.acquire(shared=1)
310       self.done.put('SHR')
311       self.sl.release()
312     except errors.LockError:
313       self.done.put('ERR')
314
315   def _doItExclusive(self):
316     try:
317       self.sl.acquire()
318       self.done.put('EXC')
319       self.sl.release()
320     except errors.LockError:
321       self.done.put('ERR')
322
323   def _doItDelete(self):
324     try:
325       self.sl.delete()
326       self.done.put('DEL')
327     except errors.LockError:
328       self.done.put('ERR')
329
330   def testSharersCanCoexist(self):
331     self.sl.acquire(shared=1)
332     threading.Thread(target=self._doItSharer).start()
333     self.assert_(self.done.get(True, 1))
334     self.sl.release()
335
336   @_Repeat
337   def testExclusiveBlocksExclusive(self):
338     self.sl.acquire()
339     self._addThread(target=self._doItExclusive)
340     self.assertRaises(Queue.Empty, self.done.get_nowait)
341     self.sl.release()
342     self._waitThreads()
343     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
344
345   @_Repeat
346   def testExclusiveBlocksDelete(self):
347     self.sl.acquire()
348     self._addThread(target=self._doItDelete)
349     self.assertRaises(Queue.Empty, self.done.get_nowait)
350     self.sl.release()
351     self._waitThreads()
352     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
353     self.sl = locking.SharedLock()
354
355   @_Repeat
356   def testExclusiveBlocksSharer(self):
357     self.sl.acquire()
358     self._addThread(target=self._doItSharer)
359     self.assertRaises(Queue.Empty, self.done.get_nowait)
360     self.sl.release()
361     self._waitThreads()
362     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
363
364   @_Repeat
365   def testSharerBlocksExclusive(self):
366     self.sl.acquire(shared=1)
367     self._addThread(target=self._doItExclusive)
368     self.assertRaises(Queue.Empty, self.done.get_nowait)
369     self.sl.release()
370     self._waitThreads()
371     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
372
373   @_Repeat
374   def testSharerBlocksDelete(self):
375     self.sl.acquire(shared=1)
376     self._addThread(target=self._doItDelete)
377     self.assertRaises(Queue.Empty, self.done.get_nowait)
378     self.sl.release()
379     self._waitThreads()
380     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
381     self.sl = locking.SharedLock()
382
383   @_Repeat
384   def testWaitingExclusiveBlocksSharer(self):
385     """SKIPPED testWaitingExclusiveBlockSharer"""
386     return
387
388     self.sl.acquire(shared=1)
389     # the lock is acquired in shared mode...
390     self._addThread(target=self._doItExclusive)
391     # ...but now an exclusive is waiting...
392     self._addThread(target=self._doItSharer)
393     # ...so the sharer should be blocked as well
394     self.assertRaises(Queue.Empty, self.done.get_nowait)
395     self.sl.release()
396     self._waitThreads()
397     # The exclusive passed before
398     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
399     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
400
401   @_Repeat
402   def testWaitingSharerBlocksExclusive(self):
403     """SKIPPED testWaitingSharerBlocksExclusive"""
404     return
405
406     self.sl.acquire()
407     # the lock is acquired in exclusive mode...
408     self._addThread(target=self._doItSharer)
409     # ...but now a sharer is waiting...
410     self._addThread(target=self._doItExclusive)
411     # ...the exclusive is waiting too...
412     self.assertRaises(Queue.Empty, self.done.get_nowait)
413     self.sl.release()
414     self._waitThreads()
415     # The sharer passed before
416     self.assertEqual(self.done.get_nowait(), 'SHR')
417     self.assertEqual(self.done.get_nowait(), 'EXC')
418
419   def testDelete(self):
420     self.sl.delete()
421     self.assertRaises(errors.LockError, self.sl.acquire)
422     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
423     self.assertRaises(errors.LockError, self.sl.delete)
424
425   def testDeleteTimeout(self):
426     self.sl.delete(timeout=60)
427
428   def testNoDeleteIfSharer(self):
429     self.sl.acquire(shared=1)
430     self.assertRaises(AssertionError, self.sl.delete)
431
432   @_Repeat
433   def testDeletePendingSharersExclusiveDelete(self):
434     self.sl.acquire()
435     self._addThread(target=self._doItSharer)
436     self._addThread(target=self._doItSharer)
437     self._addThread(target=self._doItExclusive)
438     self._addThread(target=self._doItDelete)
439     self.sl.delete()
440     self._waitThreads()
441     # The threads who were pending return ERR
442     for _ in range(4):
443       self.assertEqual(self.done.get_nowait(), 'ERR')
444     self.sl = locking.SharedLock()
445
446   @_Repeat
447   def testDeletePendingDeleteExclusiveSharers(self):
448     self.sl.acquire()
449     self._addThread(target=self._doItDelete)
450     self._addThread(target=self._doItExclusive)
451     self._addThread(target=self._doItSharer)
452     self._addThread(target=self._doItSharer)
453     self.sl.delete()
454     self._waitThreads()
455     # The two threads who were pending return both ERR
456     self.assertEqual(self.done.get_nowait(), 'ERR')
457     self.assertEqual(self.done.get_nowait(), 'ERR')
458     self.assertEqual(self.done.get_nowait(), 'ERR')
459     self.assertEqual(self.done.get_nowait(), 'ERR')
460     self.sl = locking.SharedLock()
461
462   @_Repeat
463   def testExclusiveAcquireTimeout(self):
464     for shared in [0, 1]:
465       on_queue = threading.Event()
466       release_exclusive = threading.Event()
467
468       def _LockExclusive():
469         self.sl.acquire(shared=0, test_notify=on_queue.set)
470         self.done.put("A: start wait")
471         release_exclusive.wait()
472         self.done.put("A: end wait")
473         self.sl.release()
474
475       # Start thread to hold lock in exclusive mode
476       self._addThread(target=_LockExclusive)
477
478       # Wait for wait to begin
479       self.assertEqual(self.done.get(timeout=60), "A: start wait")
480
481       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
482       # on the queue
483       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
484                                       test_notify=release_exclusive.set))
485
486       self.done.put("got 2nd")
487       self.sl.release()
488
489       self._waitThreads()
490
491       self.assertEqual(self.done.get_nowait(), "A: end wait")
492       self.assertEqual(self.done.get_nowait(), "got 2nd")
493       self.assertRaises(Queue.Empty, self.done.get_nowait)
494
495   @_Repeat
496   def testAcquireExpiringTimeout(self):
497     def _AcquireWithTimeout(shared, timeout):
498       if not self.sl.acquire(shared=shared, timeout=timeout):
499         self.done.put("timeout")
500
501     for shared in [0, 1]:
502       # Lock exclusively
503       self.sl.acquire()
504
505       # Start shared acquires with timeout between 0 and 20 ms
506       for i in range(11):
507         self._addThread(target=_AcquireWithTimeout,
508                         args=(shared, i * 2.0 / 1000.0))
509
510       # Wait for threads to finish (makes sure the acquire timeout expires
511       # before releasing the lock)
512       self._waitThreads()
513
514       # Release lock
515       self.sl.release()
516
517       for _ in range(11):
518         self.assertEqual(self.done.get_nowait(), "timeout")
519
520       self.assertRaises(Queue.Empty, self.done.get_nowait)
521
522   @_Repeat
523   def testSharedSkipExclusiveAcquires(self):
524     # Tests whether shared acquires jump in front of exclusive acquires in the
525     # queue.
526
527     def _Acquire(shared, name, notify_ev, wait_ev):
528       if notify_ev:
529         notify_fn = notify_ev.set
530       else:
531         notify_fn = None
532
533       if wait_ev:
534         wait_ev.wait()
535
536       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
537         return
538
539       self.done.put(name)
540       self.sl.release()
541
542     # Get exclusive lock while we fill the queue
543     self.sl.acquire()
544
545     shrcnt1 = 5
546     shrcnt2 = 7
547     shrcnt3 = 9
548     shrcnt4 = 2
549
550     # Add acquires using threading.Event for synchronization. They'll be
551     # acquired exactly in the order defined in this list.
552     acquires = (shrcnt1 * [(1, "shared 1")] +
553                 3 * [(0, "exclusive 1")] +
554                 shrcnt2 * [(1, "shared 2")] +
555                 shrcnt3 * [(1, "shared 3")] +
556                 shrcnt4 * [(1, "shared 4")] +
557                 3 * [(0, "exclusive 2")])
558
559     ev_cur = None
560     ev_prev = None
561
562     for args in acquires:
563       ev_cur = threading.Event()
564       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
565       ev_prev = ev_cur
566
567     # Wait for last acquire to start
568     ev_prev.wait()
569
570     # Expect 6 pending exclusive acquires and 1 for all shared acquires
571     # together
572     self.assertEqual(self.sl._count_pending(), 7)
573
574     # Release exclusive lock and wait
575     self.sl.release()
576
577     self._waitThreads()
578
579     # Check sequence
580     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
581       # Shared locks aren't guaranteed to be notified in order, but they'll be
582       # first
583       tmp = self.done.get_nowait()
584       if tmp == "shared 1":
585         shrcnt1 -= 1
586       elif tmp == "shared 2":
587         shrcnt2 -= 1
588       elif tmp == "shared 3":
589         shrcnt3 -= 1
590       elif tmp == "shared 4":
591         shrcnt4 -= 1
592     self.assertEqual(shrcnt1, 0)
593     self.assertEqual(shrcnt2, 0)
594     self.assertEqual(shrcnt3, 0)
595     self.assertEqual(shrcnt3, 0)
596
597     for _ in range(3):
598       self.assertEqual(self.done.get_nowait(), "exclusive 1")
599
600     for _ in range(3):
601       self.assertEqual(self.done.get_nowait(), "exclusive 2")
602
603     self.assertRaises(Queue.Empty, self.done.get_nowait)
604
605   @_Repeat
606   def testMixedAcquireTimeout(self):
607     sync = threading.Event()
608
609     def _AcquireShared(ev):
610       if not self.sl.acquire(shared=1, timeout=None):
611         return
612
613       self.done.put("shared")
614
615       # Notify main thread
616       ev.set()
617
618       # Wait for notification from main thread
619       sync.wait()
620
621       # Release lock
622       self.sl.release()
623
624     acquires = []
625     for _ in range(3):
626       ev = threading.Event()
627       self._addThread(target=_AcquireShared, args=(ev, ))
628       acquires.append(ev)
629
630     # Wait for all acquires to finish
631     for i in acquires:
632       i.wait()
633
634     self.assertEqual(self.sl._count_pending(), 0)
635
636     # Try to get exclusive lock
637     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
638
639     # Acquire exclusive without timeout
640     exclsync = threading.Event()
641     exclev = threading.Event()
642
643     def _AcquireExclusive():
644       if not self.sl.acquire(shared=0):
645         return
646
647       self.done.put("exclusive")
648
649       # Notify main thread
650       exclev.set()
651
652       # Wait for notification from main thread
653       exclsync.wait()
654
655       self.sl.release()
656
657     self._addThread(target=_AcquireExclusive)
658
659     # Try to get exclusive lock
660     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
661
662     # Make all shared holders release their locks
663     sync.set()
664
665     # Wait for exclusive acquire to succeed
666     exclev.wait()
667
668     self.assertEqual(self.sl._count_pending(), 0)
669
670     # Try to get exclusive lock
671     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
672
673     def _AcquireSharedSimple():
674       if self.sl.acquire(shared=1, timeout=None):
675         self.done.put("shared2")
676         self.sl.release()
677
678     for _ in range(10):
679       self._addThread(target=_AcquireSharedSimple)
680
681     # Tell exclusive lock to release
682     exclsync.set()
683
684     # Wait for everything to finish
685     self._waitThreads()
686
687     self.assertEqual(self.sl._count_pending(), 0)
688
689     # Check sequence
690     for _ in range(3):
691       self.assertEqual(self.done.get_nowait(), "shared")
692
693     self.assertEqual(self.done.get_nowait(), "exclusive")
694
695     for _ in range(10):
696       self.assertEqual(self.done.get_nowait(), "shared2")
697
698     self.assertRaises(Queue.Empty, self.done.get_nowait)
699
700
701 class TestSSynchronizedDecorator(_ThreadedTestCase):
702   """Shared Lock Synchronized decorator test"""
703
704   def setUp(self):
705     _ThreadedTestCase.setUp(self)
706
707   @locking.ssynchronized(_decoratorlock)
708   def _doItExclusive(self):
709     self.assert_(_decoratorlock._is_owned())
710     self.done.put('EXC')
711
712   @locking.ssynchronized(_decoratorlock, shared=1)
713   def _doItSharer(self):
714     self.assert_(_decoratorlock._is_owned(shared=1))
715     self.done.put('SHR')
716
717   def testDecoratedFunctions(self):
718     self._doItExclusive()
719     self.assert_(not _decoratorlock._is_owned())
720     self._doItSharer()
721     self.assert_(not _decoratorlock._is_owned())
722
723   def testSharersCanCoexist(self):
724     _decoratorlock.acquire(shared=1)
725     threading.Thread(target=self._doItSharer).start()
726     self.assert_(self.done.get(True, 1))
727     _decoratorlock.release()
728
729   @_Repeat
730   def testExclusiveBlocksExclusive(self):
731     _decoratorlock.acquire()
732     self._addThread(target=self._doItExclusive)
733     # give it a bit of time to check that it's not actually doing anything
734     self.assertRaises(Queue.Empty, self.done.get_nowait)
735     _decoratorlock.release()
736     self._waitThreads()
737     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
738
739   @_Repeat
740   def testExclusiveBlocksSharer(self):
741     _decoratorlock.acquire()
742     self._addThread(target=self._doItSharer)
743     self.assertRaises(Queue.Empty, self.done.get_nowait)
744     _decoratorlock.release()
745     self._waitThreads()
746     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
747
748   @_Repeat
749   def testSharerBlocksExclusive(self):
750     _decoratorlock.acquire(shared=1)
751     self._addThread(target=self._doItExclusive)
752     self.assertRaises(Queue.Empty, self.done.get_nowait)
753     _decoratorlock.release()
754     self._waitThreads()
755     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
756
757
758 class TestLockSet(_ThreadedTestCase):
759   """LockSet tests"""
760
761   def setUp(self):
762     _ThreadedTestCase.setUp(self)
763     self._setUpLS()
764
765   def _setUpLS(self):
766     """Helper to (re)initialize the lock set"""
767     self.resources = ['one', 'two', 'three']
768     self.ls = locking.LockSet(members=self.resources)
769
770   def testResources(self):
771     self.assertEquals(self.ls._names(), set(self.resources))
772     newls = locking.LockSet()
773     self.assertEquals(newls._names(), set())
774
775   def testAcquireRelease(self):
776     self.assert_(self.ls.acquire('one'))
777     self.assertEquals(self.ls._list_owned(), set(['one']))
778     self.ls.release()
779     self.assertEquals(self.ls._list_owned(), set())
780     self.assertEquals(self.ls.acquire(['one']), set(['one']))
781     self.assertEquals(self.ls._list_owned(), set(['one']))
782     self.ls.release()
783     self.assertEquals(self.ls._list_owned(), set())
784     self.ls.acquire(['one', 'two', 'three'])
785     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
786     self.ls.release('one')
787     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
788     self.ls.release(['three'])
789     self.assertEquals(self.ls._list_owned(), set(['two']))
790     self.ls.release()
791     self.assertEquals(self.ls._list_owned(), set())
792     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
793     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
794     self.ls.release()
795     self.assertEquals(self.ls._list_owned(), set())
796
797   def testNoDoubleAcquire(self):
798     self.ls.acquire('one')
799     self.assertRaises(AssertionError, self.ls.acquire, 'one')
800     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
801     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
802     self.ls.release()
803     self.ls.acquire(['one', 'three'])
804     self.ls.release('one')
805     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
806     self.ls.release('three')
807
808   def testNoWrongRelease(self):
809     self.assertRaises(AssertionError, self.ls.release)
810     self.ls.acquire('one')
811     self.assertRaises(AssertionError, self.ls.release, 'two')
812
813   def testAddRemove(self):
814     self.ls.add('four')
815     self.assertEquals(self.ls._list_owned(), set())
816     self.assert_('four' in self.ls._names())
817     self.ls.add(['five', 'six', 'seven'], acquired=1)
818     self.assert_('five' in self.ls._names())
819     self.assert_('six' in self.ls._names())
820     self.assert_('seven' in self.ls._names())
821     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
822     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
823     self.assert_('five' not in self.ls._names())
824     self.assert_('six' not in self.ls._names())
825     self.assertEquals(self.ls._list_owned(), set(['seven']))
826     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
827     self.ls.remove('seven')
828     self.assert_('seven' not in self.ls._names())
829     self.assertEquals(self.ls._list_owned(), set([]))
830     self.ls.acquire(None, shared=1)
831     self.assertRaises(AssertionError, self.ls.add, 'eight')
832     self.ls.release()
833     self.ls.acquire(None)
834     self.ls.add('eight', acquired=1)
835     self.assert_('eight' in self.ls._names())
836     self.assert_('eight' in self.ls._list_owned())
837     self.ls.add('nine')
838     self.assert_('nine' in self.ls._names())
839     self.assert_('nine' not in self.ls._list_owned())
840     self.ls.release()
841     self.ls.remove(['two'])
842     self.assert_('two' not in self.ls._names())
843     self.ls.acquire('three')
844     self.assertEquals(self.ls.remove(['three']), ['three'])
845     self.assert_('three' not in self.ls._names())
846     self.assertEquals(self.ls.remove('three'), [])
847     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
848     self.assert_('one' not in self.ls._names())
849
850   def testRemoveNonBlocking(self):
851     self.ls.acquire('one')
852     self.assertEquals(self.ls.remove('one'), ['one'])
853     self.ls.acquire(['two', 'three'])
854     self.assertEquals(self.ls.remove(['two', 'three']),
855                       ['two', 'three'])
856
857   def testNoDoubleAdd(self):
858     self.assertRaises(errors.LockError, self.ls.add, 'two')
859     self.ls.add('four')
860     self.assertRaises(errors.LockError, self.ls.add, 'four')
861
862   def testNoWrongRemoves(self):
863     self.ls.acquire(['one', 'three'], shared=1)
864     # Cannot remove 'two' while holding something which is not a superset
865     self.assertRaises(AssertionError, self.ls.remove, 'two')
866     # Cannot remove 'three' as we are sharing it
867     self.assertRaises(AssertionError, self.ls.remove, 'three')
868
869   def testAcquireSetLock(self):
870     # acquire the set-lock exclusively
871     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
872     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
873     self.assertEquals(self.ls._is_owned(), True)
874     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
875     # I can still add/remove elements...
876     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
877     self.assert_(self.ls.add('six'))
878     self.ls.release()
879     # share the set-lock
880     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
881     # adding new elements is not possible
882     self.assertRaises(AssertionError, self.ls.add, 'five')
883     self.ls.release()
884
885   def testAcquireWithRepetitions(self):
886     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
887                       set(['two', 'two', 'three']))
888     self.ls.release(['two', 'two'])
889     self.assertEquals(self.ls._list_owned(), set(['three']))
890
891   def testEmptyAcquire(self):
892     # Acquire an empty list of locks...
893     self.assertEquals(self.ls.acquire([]), set())
894     self.assertEquals(self.ls._list_owned(), set())
895     # New locks can still be addded
896     self.assert_(self.ls.add('six'))
897     # "re-acquiring" is not an issue, since we had really acquired nothing
898     self.assertEquals(self.ls.acquire([], shared=1), set())
899     self.assertEquals(self.ls._list_owned(), set())
900     # We haven't really acquired anything, so we cannot release
901     self.assertRaises(AssertionError, self.ls.release)
902
903   def _doLockSet(self, names, shared):
904     try:
905       self.ls.acquire(names, shared=shared)
906       self.done.put('DONE')
907       self.ls.release()
908     except errors.LockError:
909       self.done.put('ERR')
910
911   def _doAddSet(self, names):
912     try:
913       self.ls.add(names, acquired=1)
914       self.done.put('DONE')
915       self.ls.release()
916     except errors.LockError:
917       self.done.put('ERR')
918
919   def _doRemoveSet(self, names):
920     self.done.put(self.ls.remove(names))
921
922   @_Repeat
923   def testConcurrentSharedAcquire(self):
924     self.ls.acquire(['one', 'two'], shared=1)
925     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
926     self._waitThreads()
927     self.assertEqual(self.done.get_nowait(), 'DONE')
928     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
929     self._waitThreads()
930     self.assertEqual(self.done.get_nowait(), 'DONE')
931     self._addThread(target=self._doLockSet, args=('three', 1))
932     self._waitThreads()
933     self.assertEqual(self.done.get_nowait(), 'DONE')
934     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
935     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
936     self.assertRaises(Queue.Empty, self.done.get_nowait)
937     self.ls.release()
938     self._waitThreads()
939     self.assertEqual(self.done.get_nowait(), 'DONE')
940     self.assertEqual(self.done.get_nowait(), 'DONE')
941
942   @_Repeat
943   def testConcurrentExclusiveAcquire(self):
944     self.ls.acquire(['one', 'two'])
945     self._addThread(target=self._doLockSet, args=('three', 1))
946     self._waitThreads()
947     self.assertEqual(self.done.get_nowait(), 'DONE')
948     self._addThread(target=self._doLockSet, args=('three', 0))
949     self._waitThreads()
950     self.assertEqual(self.done.get_nowait(), 'DONE')
951     self.assertRaises(Queue.Empty, self.done.get_nowait)
952     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
953     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
954     self._addThread(target=self._doLockSet, args=('one', 0))
955     self._addThread(target=self._doLockSet, args=('one', 1))
956     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
957     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
958     self.assertRaises(Queue.Empty, self.done.get_nowait)
959     self.ls.release()
960     self._waitThreads()
961     for _ in range(6):
962       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
963
964   @_Repeat
965   def testSimpleAcquireTimeoutExpiring(self):
966     names = sorted(self.ls._names())
967     self.assert_(len(names) >= 3)
968
969     # Get name of first lock
970     first = names[0]
971
972     # Get name of last lock
973     last = names.pop()
974
975     checks = [
976       # Block first and try to lock it again
977       (first, first),
978
979       # Block last and try to lock all locks
980       (None, first),
981
982       # Block last and try to lock it again
983       (last, last),
984       ]
985
986     for (wanted, block) in checks:
987       # Lock in exclusive mode
988       self.assert_(self.ls.acquire(block, shared=0))
989
990       def _AcquireOne():
991         # Try to get the same lock again with a timeout (should never succeed)
992         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
993         if acquired:
994           self.done.put("acquired")
995           self.ls.release()
996         else:
997           self.assert_(acquired is None)
998           self.assert_(not self.ls._list_owned())
999           self.assert_(not self.ls._is_owned())
1000           self.done.put("not acquired")
1001
1002       self._addThread(target=_AcquireOne)
1003
1004       # Wait for timeout in thread to expire
1005       self._waitThreads()
1006
1007       # Release exclusive lock again
1008       self.ls.release()
1009
1010       self.assertEqual(self.done.get_nowait(), "not acquired")
1011       self.assertRaises(Queue.Empty, self.done.get_nowait)
1012
1013   @_Repeat
1014   def testDelayedAndExpiringLockAcquire(self):
1015     self._setUpLS()
1016     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1017
1018     for expire in (False, True):
1019       names = sorted(self.ls._names())
1020       self.assertEqual(len(names), 8)
1021
1022       lock_ev = dict([(i, threading.Event()) for i in names])
1023
1024       # Lock all in exclusive mode
1025       self.assert_(self.ls.acquire(names, shared=0))
1026
1027       if expire:
1028         # We'll wait at least 300ms per lock
1029         lockwait = len(names) * [0.3]
1030
1031         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1032         # this gives us up to 2.4s to fail.
1033         lockall_timeout = 0.4
1034       else:
1035         # This should finish rather quickly
1036         lockwait = None
1037         lockall_timeout = len(names) * 5.0
1038
1039       def _LockAll():
1040         def acquire_notification(name):
1041           if not expire:
1042             self.done.put("getting %s" % name)
1043
1044           # Kick next lock
1045           lock_ev[name].set()
1046
1047         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1048                            test_notify=acquire_notification):
1049           self.done.put("got all")
1050           self.ls.release()
1051         else:
1052           self.done.put("timeout on all")
1053
1054         # Notify all locks
1055         for ev in lock_ev.values():
1056           ev.set()
1057
1058       t = self._addThread(target=_LockAll)
1059
1060       for idx, name in enumerate(names):
1061         # Wait for actual acquire on this lock to start
1062         lock_ev[name].wait(10.0)
1063
1064         if expire and t.isAlive():
1065           # Wait some time after getting the notification to make sure the lock
1066           # acquire will expire
1067           SafeSleep(lockwait[idx])
1068
1069         self.ls.release(names=name)
1070
1071       self.assert_(not self.ls._list_owned())
1072
1073       self._waitThreads()
1074
1075       if expire:
1076         # Not checking which locks were actually acquired. Doing so would be
1077         # too timing-dependant.
1078         self.assertEqual(self.done.get_nowait(), "timeout on all")
1079       else:
1080         for i in names:
1081           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1082         self.assertEqual(self.done.get_nowait(), "got all")
1083       self.assertRaises(Queue.Empty, self.done.get_nowait)
1084
1085   @_Repeat
1086   def testConcurrentRemove(self):
1087     self.ls.add('four')
1088     self.ls.acquire(['one', 'two', 'four'])
1089     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1090     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1091     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1092     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1093     self.assertRaises(Queue.Empty, self.done.get_nowait)
1094     self.ls.remove('one')
1095     self.ls.release()
1096     self._waitThreads()
1097     for i in range(4):
1098       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1099     self.ls.add(['five', 'six'], acquired=1)
1100     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1101     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1102     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1103     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1104     self.ls.remove('five')
1105     self.ls.release()
1106     self._waitThreads()
1107     for i in range(4):
1108       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1109     self.ls.acquire(['three', 'four'])
1110     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1111     self.assertRaises(Queue.Empty, self.done.get_nowait)
1112     self.ls.remove('four')
1113     self._waitThreads()
1114     self.assertEqual(self.done.get_nowait(), ['six'])
1115     self._addThread(target=self._doRemoveSet, args=(['two']))
1116     self._waitThreads()
1117     self.assertEqual(self.done.get_nowait(), ['two'])
1118     self.ls.release()
1119     # reset lockset
1120     self._setUpLS()
1121
1122   @_Repeat
1123   def testConcurrentSharedSetLock(self):
1124     # share the set-lock...
1125     self.ls.acquire(None, shared=1)
1126     # ...another thread can share it too
1127     self._addThread(target=self._doLockSet, args=(None, 1))
1128     self._waitThreads()
1129     self.assertEqual(self.done.get_nowait(), 'DONE')
1130     # ...or just share some elements
1131     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1132     self._waitThreads()
1133     self.assertEqual(self.done.get_nowait(), 'DONE')
1134     # ...but not add new ones or remove any
1135     t = self._addThread(target=self._doAddSet, args=(['nine']))
1136     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1137     self.assertRaises(Queue.Empty, self.done.get_nowait)
1138     # this just releases the set-lock
1139     self.ls.release([])
1140     t.join(60)
1141     self.assertEqual(self.done.get_nowait(), 'DONE')
1142     # release the lock on the actual elements so remove() can proceed too
1143     self.ls.release()
1144     self._waitThreads()
1145     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1146     # reset lockset
1147     self._setUpLS()
1148
1149   @_Repeat
1150   def testConcurrentExclusiveSetLock(self):
1151     # acquire the set-lock...
1152     self.ls.acquire(None, shared=0)
1153     # ...no one can do anything else
1154     self._addThread(target=self._doLockSet, args=(None, 1))
1155     self._addThread(target=self._doLockSet, args=(None, 0))
1156     self._addThread(target=self._doLockSet, args=(['three'], 0))
1157     self._addThread(target=self._doLockSet, args=(['two'], 1))
1158     self._addThread(target=self._doAddSet, args=(['nine']))
1159     self.assertRaises(Queue.Empty, self.done.get_nowait)
1160     self.ls.release()
1161     self._waitThreads()
1162     for _ in range(5):
1163       self.assertEqual(self.done.get(True, 1), 'DONE')
1164     # cleanup
1165     self._setUpLS()
1166
1167   @_Repeat
1168   def testConcurrentSetLockAdd(self):
1169     self.ls.acquire('one')
1170     # Another thread wants the whole SetLock
1171     self._addThread(target=self._doLockSet, args=(None, 0))
1172     self._addThread(target=self._doLockSet, args=(None, 1))
1173     self.assertRaises(Queue.Empty, self.done.get_nowait)
1174     self.assertRaises(AssertionError, self.ls.add, 'four')
1175     self.ls.release()
1176     self._waitThreads()
1177     self.assertEqual(self.done.get_nowait(), 'DONE')
1178     self.assertEqual(self.done.get_nowait(), 'DONE')
1179     self.ls.acquire(None)
1180     self._addThread(target=self._doLockSet, args=(None, 0))
1181     self._addThread(target=self._doLockSet, args=(None, 1))
1182     self.assertRaises(Queue.Empty, self.done.get_nowait)
1183     self.ls.add('four')
1184     self.ls.add('five', acquired=1)
1185     self.ls.add('six', acquired=1, shared=1)
1186     self.assertEquals(self.ls._list_owned(),
1187       set(['one', 'two', 'three', 'five', 'six']))
1188     self.assertEquals(self.ls._is_owned(), True)
1189     self.assertEquals(self.ls._names(),
1190       set(['one', 'two', 'three', 'four', 'five', 'six']))
1191     self.ls.release()
1192     self._waitThreads()
1193     self.assertEqual(self.done.get_nowait(), 'DONE')
1194     self.assertEqual(self.done.get_nowait(), 'DONE')
1195     self._setUpLS()
1196
1197   @_Repeat
1198   def testEmptyLockSet(self):
1199     # get the set-lock
1200     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1201     # now empty it...
1202     self.ls.remove(['one', 'two', 'three'])
1203     # and adds/locks by another thread still wait
1204     self._addThread(target=self._doAddSet, args=(['nine']))
1205     self._addThread(target=self._doLockSet, args=(None, 1))
1206     self._addThread(target=self._doLockSet, args=(None, 0))
1207     self.assertRaises(Queue.Empty, self.done.get_nowait)
1208     self.ls.release()
1209     self._waitThreads()
1210     for _ in range(3):
1211       self.assertEqual(self.done.get_nowait(), 'DONE')
1212     # empty it again...
1213     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1214     # now share it...
1215     self.assertEqual(self.ls.acquire(None, shared=1), set())
1216     # other sharers can go, adds still wait
1217     self._addThread(target=self._doLockSet, args=(None, 1))
1218     self._waitThreads()
1219     self.assertEqual(self.done.get_nowait(), 'DONE')
1220     self._addThread(target=self._doAddSet, args=(['nine']))
1221     self.assertRaises(Queue.Empty, self.done.get_nowait)
1222     self.ls.release()
1223     self._waitThreads()
1224     self.assertEqual(self.done.get_nowait(), 'DONE')
1225     self._setUpLS()
1226
1227
1228 class TestGanetiLockManager(_ThreadedTestCase):
1229
1230   def setUp(self):
1231     _ThreadedTestCase.setUp(self)
1232     self.nodes=['n1', 'n2']
1233     self.instances=['i1', 'i2', 'i3']
1234     self.GL = locking.GanetiLockManager(nodes=self.nodes,
1235                                         instances=self.instances)
1236
1237   def tearDown(self):
1238     # Don't try this at home...
1239     locking.GanetiLockManager._instance = None
1240
1241   def testLockingConstants(self):
1242     # The locking library internally cheats by assuming its constants have some
1243     # relationships with each other. Check those hold true.
1244     # This relationship is also used in the Processor to recursively acquire
1245     # the right locks. Again, please don't break it.
1246     for i in range(len(locking.LEVELS)):
1247       self.assertEqual(i, locking.LEVELS[i])
1248
1249   def testDoubleGLFails(self):
1250     self.assertRaises(AssertionError, locking.GanetiLockManager)
1251
1252   def testLockNames(self):
1253     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1254     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1255     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1256                      set(self.instances))
1257
1258   def testInitAndResources(self):
1259     locking.GanetiLockManager._instance = None
1260     self.GL = locking.GanetiLockManager()
1261     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1262     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1263     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1264
1265     locking.GanetiLockManager._instance = None
1266     self.GL = locking.GanetiLockManager(nodes=self.nodes)
1267     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1268     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1269     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1270
1271     locking.GanetiLockManager._instance = None
1272     self.GL = locking.GanetiLockManager(instances=self.instances)
1273     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1274     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1275     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1276                      set(self.instances))
1277
1278   def testAcquireRelease(self):
1279     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1280     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1281     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1282     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1283     self.GL.release(locking.LEVEL_NODE, ['n2'])
1284     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1285     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1286     self.GL.release(locking.LEVEL_NODE)
1287     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1288     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1289     self.GL.release(locking.LEVEL_INSTANCE)
1290     self.assertRaises(errors.LockError, self.GL.acquire,
1291                       locking.LEVEL_INSTANCE, ['i5'])
1292     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1293     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1294
1295   def testAcquireWholeSets(self):
1296     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1297     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1298                       set(self.instances))
1299     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1300                       set(self.instances))
1301     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1302                       set(self.nodes))
1303     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1304                       set(self.nodes))
1305     self.GL.release(locking.LEVEL_NODE)
1306     self.GL.release(locking.LEVEL_INSTANCE)
1307     self.GL.release(locking.LEVEL_CLUSTER)
1308
1309   def testAcquireWholeAndPartial(self):
1310     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1311     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1312                       set(self.instances))
1313     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1314                       set(self.instances))
1315     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1316                       set(['n2']))
1317     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1318                       set(['n2']))
1319     self.GL.release(locking.LEVEL_NODE)
1320     self.GL.release(locking.LEVEL_INSTANCE)
1321     self.GL.release(locking.LEVEL_CLUSTER)
1322
1323   def testBGLDependency(self):
1324     self.assertRaises(AssertionError, self.GL.acquire,
1325                       locking.LEVEL_NODE, ['n1', 'n2'])
1326     self.assertRaises(AssertionError, self.GL.acquire,
1327                       locking.LEVEL_INSTANCE, ['i3'])
1328     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1329     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1330     self.assertRaises(AssertionError, self.GL.release,
1331                       locking.LEVEL_CLUSTER, ['BGL'])
1332     self.assertRaises(AssertionError, self.GL.release,
1333                       locking.LEVEL_CLUSTER)
1334     self.GL.release(locking.LEVEL_NODE)
1335     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1336     self.assertRaises(AssertionError, self.GL.release,
1337                       locking.LEVEL_CLUSTER, ['BGL'])
1338     self.assertRaises(AssertionError, self.GL.release,
1339                       locking.LEVEL_CLUSTER)
1340     self.GL.release(locking.LEVEL_INSTANCE)
1341
1342   def testWrongOrder(self):
1343     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1344     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1345     self.assertRaises(AssertionError, self.GL.acquire,
1346                       locking.LEVEL_NODE, ['n1'])
1347     self.assertRaises(AssertionError, self.GL.acquire,
1348                       locking.LEVEL_INSTANCE, ['i2'])
1349
1350   # Helper function to run as a thread that shared the BGL and then acquires
1351   # some locks at another level.
1352   def _doLock(self, level, names, shared):
1353     try:
1354       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1355       self.GL.acquire(level, names, shared=shared)
1356       self.done.put('DONE')
1357       self.GL.release(level)
1358       self.GL.release(locking.LEVEL_CLUSTER)
1359     except errors.LockError:
1360       self.done.put('ERR')
1361
1362   @_Repeat
1363   def testConcurrency(self):
1364     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1365     self._addThread(target=self._doLock,
1366                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1367     self._waitThreads()
1368     self.assertEqual(self.done.get_nowait(), 'DONE')
1369     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1370     self._addThread(target=self._doLock,
1371                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1372     self._waitThreads()
1373     self.assertEqual(self.done.get_nowait(), 'DONE')
1374     self._addThread(target=self._doLock,
1375                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1376     self.assertRaises(Queue.Empty, self.done.get_nowait)
1377     self.GL.release(locking.LEVEL_INSTANCE)
1378     self._waitThreads()
1379     self.assertEqual(self.done.get_nowait(), 'DONE')
1380     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1381     self._addThread(target=self._doLock,
1382                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1383     self._waitThreads()
1384     self.assertEqual(self.done.get_nowait(), 'DONE')
1385     self._addThread(target=self._doLock,
1386                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1387     self.assertRaises(Queue.Empty, self.done.get_nowait)
1388     self.GL.release(locking.LEVEL_INSTANCE)
1389     self._waitThreads()
1390     self.assertEqual(self.done.get(True, 1), 'DONE')
1391     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1392
1393
1394 if __name__ == '__main__':
1395   testutils.GanetiTestProgram()