Modify gnt-node add to call external script
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30
31 from ganeti import locking
32 from ganeti import errors
33
34 import testutils
35
36
37 # This is used to test the ssynchronize decorator.
38 # Since it's passed as input to a decorator it must be declared as a global.
39 _decoratorlock = locking.SharedLock("decorator lock")
40
41 #: List for looping tests
42 ITERATIONS = range(8)
43
44
45 def _Repeat(fn):
46   """Decorator for executing a function many times"""
47   def wrapper(*args, **kwargs):
48     for i in ITERATIONS:
49       fn(*args, **kwargs)
50   return wrapper
51
52
53 def SafeSleep(duration):
54   start = time.time()
55   while True:
56     delay = start + duration - time.time()
57     if delay <= 0.0:
58       break
59     time.sleep(delay)
60
61
62 class _ThreadedTestCase(unittest.TestCase):
63   """Test class that supports adding/waiting on threads"""
64   def setUp(self):
65     unittest.TestCase.setUp(self)
66     self.done = Queue.Queue(0)
67     self.threads = []
68
69   def _addThread(self, *args, **kwargs):
70     """Create and remember a new thread"""
71     t = threading.Thread(*args, **kwargs)
72     self.threads.append(t)
73     t.start()
74     return t
75
76   def _waitThreads(self):
77     """Wait for all our threads to finish"""
78     for t in self.threads:
79       t.join(60)
80       self.failIf(t.isAlive())
81     self.threads = []
82
83
84 class _ConditionTestCase(_ThreadedTestCase):
85   """Common test case for conditions"""
86
87   def setUp(self, cls):
88     _ThreadedTestCase.setUp(self)
89     self.lock = threading.Lock()
90     self.cond = cls(self.lock)
91
92   def _testAcquireRelease(self):
93     self.assertFalse(self.cond._is_owned())
94     self.assertRaises(RuntimeError, self.cond.wait)
95     self.assertRaises(RuntimeError, self.cond.notifyAll)
96
97     self.cond.acquire()
98     self.assert_(self.cond._is_owned())
99     self.cond.notifyAll()
100     self.assert_(self.cond._is_owned())
101     self.cond.release()
102
103     self.assertFalse(self.cond._is_owned())
104     self.assertRaises(RuntimeError, self.cond.wait)
105     self.assertRaises(RuntimeError, self.cond.notifyAll)
106
107   def _testNotification(self):
108     def _NotifyAll():
109       self.done.put("NE")
110       self.cond.acquire()
111       self.done.put("NA")
112       self.cond.notifyAll()
113       self.done.put("NN")
114       self.cond.release()
115
116     self.cond.acquire()
117     self._addThread(target=_NotifyAll)
118     self.assertEqual(self.done.get(True, 1), "NE")
119     self.assertRaises(Queue.Empty, self.done.get_nowait)
120     self.cond.wait()
121     self.assertEqual(self.done.get(True, 1), "NA")
122     self.assertEqual(self.done.get(True, 1), "NN")
123     self.assert_(self.cond._is_owned())
124     self.cond.release()
125     self.assertFalse(self.cond._is_owned())
126
127
128 class TestSingleNotifyPipeCondition(_ConditionTestCase):
129   """SingleNotifyPipeCondition tests"""
130
131   def setUp(self):
132     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
133
134   def testAcquireRelease(self):
135     self._testAcquireRelease()
136
137   def testNotification(self):
138     self._testNotification()
139
140   def testWaitReuse(self):
141     self.cond.acquire()
142     self.cond.wait(0)
143     self.cond.wait(0.1)
144     self.cond.release()
145
146   def testNoNotifyReuse(self):
147     self.cond.acquire()
148     self.cond.notifyAll()
149     self.assertRaises(RuntimeError, self.cond.wait)
150     self.assertRaises(RuntimeError, self.cond.notifyAll)
151     self.cond.release()
152
153
154 class TestPipeCondition(_ConditionTestCase):
155   """PipeCondition tests"""
156
157   def setUp(self):
158     _ConditionTestCase.setUp(self, locking.PipeCondition)
159
160   def testAcquireRelease(self):
161     self._testAcquireRelease()
162
163   def testNotification(self):
164     self._testNotification()
165
166   def _TestWait(self, fn):
167     self._addThread(target=fn)
168     self._addThread(target=fn)
169     self._addThread(target=fn)
170
171     # Wait for threads to be waiting
172     self.assertEqual(self.done.get(True, 1), "A")
173     self.assertEqual(self.done.get(True, 1), "A")
174     self.assertEqual(self.done.get(True, 1), "A")
175
176     self.assertRaises(Queue.Empty, self.done.get_nowait)
177
178     self.cond.acquire()
179     self.assertEqual(self.cond._nwaiters, 3)
180     # This new thread can"t acquire the lock, and thus call wait, before we
181     # release it
182     self._addThread(target=fn)
183     self.cond.notifyAll()
184     self.assertRaises(Queue.Empty, self.done.get_nowait)
185     self.cond.release()
186
187     # We should now get 3 W and 1 A (for the new thread) in whatever order
188     w = 0
189     a = 0
190     for i in range(4):
191       got = self.done.get(True, 1)
192       if got == "W":
193         w += 1
194       elif got == "A":
195         a += 1
196       else:
197         self.fail("Got %s on the done queue" % got)
198
199     self.assertEqual(w, 3)
200     self.assertEqual(a, 1)
201
202     self.cond.acquire()
203     self.cond.notifyAll()
204     self.cond.release()
205     self._waitThreads()
206     self.assertEqual(self.done.get_nowait(), "W")
207     self.assertRaises(Queue.Empty, self.done.get_nowait)
208
209   def testBlockingWait(self):
210     def _BlockingWait():
211       self.cond.acquire()
212       self.done.put("A")
213       self.cond.wait()
214       self.cond.release()
215       self.done.put("W")
216
217     self._TestWait(_BlockingWait)
218
219   def testLongTimeoutWait(self):
220     def _Helper():
221       self.cond.acquire()
222       self.done.put("A")
223       self.cond.wait(15.0)
224       self.cond.release()
225       self.done.put("W")
226
227     self._TestWait(_Helper)
228
229   def _TimeoutWait(self, timeout, check):
230     self.cond.acquire()
231     self.cond.wait(timeout)
232     self.cond.release()
233     self.done.put(check)
234
235   def testShortTimeoutWait(self):
236     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
237     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
238     self._waitThreads()
239     self.assertEqual(self.done.get_nowait(), "T1")
240     self.assertEqual(self.done.get_nowait(), "T1")
241     self.assertRaises(Queue.Empty, self.done.get_nowait)
242
243   def testZeroTimeoutWait(self):
244     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
245     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
246     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
247     self._waitThreads()
248     self.assertEqual(self.done.get_nowait(), "T0")
249     self.assertEqual(self.done.get_nowait(), "T0")
250     self.assertEqual(self.done.get_nowait(), "T0")
251     self.assertRaises(Queue.Empty, self.done.get_nowait)
252
253
254 class TestSharedLock(_ThreadedTestCase):
255   """SharedLock tests"""
256
257   def setUp(self):
258     _ThreadedTestCase.setUp(self)
259     self.sl = locking.SharedLock("TestSharedLock")
260
261   def testSequenceAndOwnership(self):
262     self.assertFalse(self.sl._is_owned())
263     self.sl.acquire(shared=1)
264     self.assert_(self.sl._is_owned())
265     self.assert_(self.sl._is_owned(shared=1))
266     self.assertFalse(self.sl._is_owned(shared=0))
267     self.sl.release()
268     self.assertFalse(self.sl._is_owned())
269     self.sl.acquire()
270     self.assert_(self.sl._is_owned())
271     self.assertFalse(self.sl._is_owned(shared=1))
272     self.assert_(self.sl._is_owned(shared=0))
273     self.sl.release()
274     self.assertFalse(self.sl._is_owned())
275     self.sl.acquire(shared=1)
276     self.assert_(self.sl._is_owned())
277     self.assert_(self.sl._is_owned(shared=1))
278     self.assertFalse(self.sl._is_owned(shared=0))
279     self.sl.release()
280     self.assertFalse(self.sl._is_owned())
281
282   def testBooleanValue(self):
283     # semaphores are supposed to return a true value on a successful acquire
284     self.assert_(self.sl.acquire(shared=1))
285     self.sl.release()
286     self.assert_(self.sl.acquire())
287     self.sl.release()
288
289   def testDoubleLockingStoE(self):
290     self.sl.acquire(shared=1)
291     self.assertRaises(AssertionError, self.sl.acquire)
292
293   def testDoubleLockingEtoS(self):
294     self.sl.acquire()
295     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
296
297   def testDoubleLockingStoS(self):
298     self.sl.acquire(shared=1)
299     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
300
301   def testDoubleLockingEtoE(self):
302     self.sl.acquire()
303     self.assertRaises(AssertionError, self.sl.acquire)
304
305   # helper functions: called in a separate thread they acquire the lock, send
306   # their identifier on the done queue, then release it.
307   def _doItSharer(self):
308     try:
309       self.sl.acquire(shared=1)
310       self.done.put('SHR')
311       self.sl.release()
312     except errors.LockError:
313       self.done.put('ERR')
314
315   def _doItExclusive(self):
316     try:
317       self.sl.acquire()
318       self.done.put('EXC')
319       self.sl.release()
320     except errors.LockError:
321       self.done.put('ERR')
322
323   def _doItDelete(self):
324     try:
325       self.sl.delete()
326       self.done.put('DEL')
327     except errors.LockError:
328       self.done.put('ERR')
329
330   def testSharersCanCoexist(self):
331     self.sl.acquire(shared=1)
332     threading.Thread(target=self._doItSharer).start()
333     self.assert_(self.done.get(True, 1))
334     self.sl.release()
335
336   @_Repeat
337   def testExclusiveBlocksExclusive(self):
338     self.sl.acquire()
339     self._addThread(target=self._doItExclusive)
340     self.assertRaises(Queue.Empty, self.done.get_nowait)
341     self.sl.release()
342     self._waitThreads()
343     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
344
345   @_Repeat
346   def testExclusiveBlocksDelete(self):
347     self.sl.acquire()
348     self._addThread(target=self._doItDelete)
349     self.assertRaises(Queue.Empty, self.done.get_nowait)
350     self.sl.release()
351     self._waitThreads()
352     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
353     self.sl = locking.SharedLock(self.sl.name)
354
355   @_Repeat
356   def testExclusiveBlocksSharer(self):
357     self.sl.acquire()
358     self._addThread(target=self._doItSharer)
359     self.assertRaises(Queue.Empty, self.done.get_nowait)
360     self.sl.release()
361     self._waitThreads()
362     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
363
364   @_Repeat
365   def testSharerBlocksExclusive(self):
366     self.sl.acquire(shared=1)
367     self._addThread(target=self._doItExclusive)
368     self.assertRaises(Queue.Empty, self.done.get_nowait)
369     self.sl.release()
370     self._waitThreads()
371     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
372
373   @_Repeat
374   def testSharerBlocksDelete(self):
375     self.sl.acquire(shared=1)
376     self._addThread(target=self._doItDelete)
377     self.assertRaises(Queue.Empty, self.done.get_nowait)
378     self.sl.release()
379     self._waitThreads()
380     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
381     self.sl = locking.SharedLock(self.sl.name)
382
383   @_Repeat
384   def testWaitingExclusiveBlocksSharer(self):
385     """SKIPPED testWaitingExclusiveBlockSharer"""
386     return
387
388     self.sl.acquire(shared=1)
389     # the lock is acquired in shared mode...
390     self._addThread(target=self._doItExclusive)
391     # ...but now an exclusive is waiting...
392     self._addThread(target=self._doItSharer)
393     # ...so the sharer should be blocked as well
394     self.assertRaises(Queue.Empty, self.done.get_nowait)
395     self.sl.release()
396     self._waitThreads()
397     # The exclusive passed before
398     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
399     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
400
401   @_Repeat
402   def testWaitingSharerBlocksExclusive(self):
403     """SKIPPED testWaitingSharerBlocksExclusive"""
404     return
405
406     self.sl.acquire()
407     # the lock is acquired in exclusive mode...
408     self._addThread(target=self._doItSharer)
409     # ...but now a sharer is waiting...
410     self._addThread(target=self._doItExclusive)
411     # ...the exclusive is waiting too...
412     self.assertRaises(Queue.Empty, self.done.get_nowait)
413     self.sl.release()
414     self._waitThreads()
415     # The sharer passed before
416     self.assertEqual(self.done.get_nowait(), 'SHR')
417     self.assertEqual(self.done.get_nowait(), 'EXC')
418
419   def testDelete(self):
420     self.sl.delete()
421     self.assertRaises(errors.LockError, self.sl.acquire)
422     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
423     self.assertRaises(errors.LockError, self.sl.delete)
424
425   def testDeleteTimeout(self):
426     self.sl.delete(timeout=60)
427
428   def testNoDeleteIfSharer(self):
429     self.sl.acquire(shared=1)
430     self.assertRaises(AssertionError, self.sl.delete)
431
432   @_Repeat
433   def testDeletePendingSharersExclusiveDelete(self):
434     self.sl.acquire()
435     self._addThread(target=self._doItSharer)
436     self._addThread(target=self._doItSharer)
437     self._addThread(target=self._doItExclusive)
438     self._addThread(target=self._doItDelete)
439     self.sl.delete()
440     self._waitThreads()
441     # The threads who were pending return ERR
442     for _ in range(4):
443       self.assertEqual(self.done.get_nowait(), 'ERR')
444     self.sl = locking.SharedLock(self.sl.name)
445
446   @_Repeat
447   def testDeletePendingDeleteExclusiveSharers(self):
448     self.sl.acquire()
449     self._addThread(target=self._doItDelete)
450     self._addThread(target=self._doItExclusive)
451     self._addThread(target=self._doItSharer)
452     self._addThread(target=self._doItSharer)
453     self.sl.delete()
454     self._waitThreads()
455     # The two threads who were pending return both ERR
456     self.assertEqual(self.done.get_nowait(), 'ERR')
457     self.assertEqual(self.done.get_nowait(), 'ERR')
458     self.assertEqual(self.done.get_nowait(), 'ERR')
459     self.assertEqual(self.done.get_nowait(), 'ERR')
460     self.sl = locking.SharedLock(self.sl.name)
461
462   @_Repeat
463   def testExclusiveAcquireTimeout(self):
464     for shared in [0, 1]:
465       on_queue = threading.Event()
466       release_exclusive = threading.Event()
467
468       def _LockExclusive():
469         self.sl.acquire(shared=0, test_notify=on_queue.set)
470         self.done.put("A: start wait")
471         release_exclusive.wait()
472         self.done.put("A: end wait")
473         self.sl.release()
474
475       # Start thread to hold lock in exclusive mode
476       self._addThread(target=_LockExclusive)
477
478       # Wait for wait to begin
479       self.assertEqual(self.done.get(timeout=60), "A: start wait")
480
481       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
482       # on the queue
483       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
484                                       test_notify=release_exclusive.set))
485
486       self.done.put("got 2nd")
487       self.sl.release()
488
489       self._waitThreads()
490
491       self.assertEqual(self.done.get_nowait(), "A: end wait")
492       self.assertEqual(self.done.get_nowait(), "got 2nd")
493       self.assertRaises(Queue.Empty, self.done.get_nowait)
494
495   @_Repeat
496   def testAcquireExpiringTimeout(self):
497     def _AcquireWithTimeout(shared, timeout):
498       if not self.sl.acquire(shared=shared, timeout=timeout):
499         self.done.put("timeout")
500
501     for shared in [0, 1]:
502       # Lock exclusively
503       self.sl.acquire()
504
505       # Start shared acquires with timeout between 0 and 20 ms
506       for i in range(11):
507         self._addThread(target=_AcquireWithTimeout,
508                         args=(shared, i * 2.0 / 1000.0))
509
510       # Wait for threads to finish (makes sure the acquire timeout expires
511       # before releasing the lock)
512       self._waitThreads()
513
514       # Release lock
515       self.sl.release()
516
517       for _ in range(11):
518         self.assertEqual(self.done.get_nowait(), "timeout")
519
520       self.assertRaises(Queue.Empty, self.done.get_nowait)
521
522   @_Repeat
523   def testSharedSkipExclusiveAcquires(self):
524     # Tests whether shared acquires jump in front of exclusive acquires in the
525     # queue.
526
527     def _Acquire(shared, name, notify_ev, wait_ev):
528       if notify_ev:
529         notify_fn = notify_ev.set
530       else:
531         notify_fn = None
532
533       if wait_ev:
534         wait_ev.wait()
535
536       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
537         return
538
539       self.done.put(name)
540       self.sl.release()
541
542     # Get exclusive lock while we fill the queue
543     self.sl.acquire()
544
545     shrcnt1 = 5
546     shrcnt2 = 7
547     shrcnt3 = 9
548     shrcnt4 = 2
549
550     # Add acquires using threading.Event for synchronization. They'll be
551     # acquired exactly in the order defined in this list.
552     acquires = (shrcnt1 * [(1, "shared 1")] +
553                 3 * [(0, "exclusive 1")] +
554                 shrcnt2 * [(1, "shared 2")] +
555                 shrcnt3 * [(1, "shared 3")] +
556                 shrcnt4 * [(1, "shared 4")] +
557                 3 * [(0, "exclusive 2")])
558
559     ev_cur = None
560     ev_prev = None
561
562     for args in acquires:
563       ev_cur = threading.Event()
564       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
565       ev_prev = ev_cur
566
567     # Wait for last acquire to start
568     ev_prev.wait()
569
570     # Expect 6 pending exclusive acquires and 1 for all shared acquires
571     # together
572     self.assertEqual(self.sl._count_pending(), 7)
573
574     # Release exclusive lock and wait
575     self.sl.release()
576
577     self._waitThreads()
578
579     # Check sequence
580     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
581       # Shared locks aren't guaranteed to be notified in order, but they'll be
582       # first
583       tmp = self.done.get_nowait()
584       if tmp == "shared 1":
585         shrcnt1 -= 1
586       elif tmp == "shared 2":
587         shrcnt2 -= 1
588       elif tmp == "shared 3":
589         shrcnt3 -= 1
590       elif tmp == "shared 4":
591         shrcnt4 -= 1
592     self.assertEqual(shrcnt1, 0)
593     self.assertEqual(shrcnt2, 0)
594     self.assertEqual(shrcnt3, 0)
595     self.assertEqual(shrcnt3, 0)
596
597     for _ in range(3):
598       self.assertEqual(self.done.get_nowait(), "exclusive 1")
599
600     for _ in range(3):
601       self.assertEqual(self.done.get_nowait(), "exclusive 2")
602
603     self.assertRaises(Queue.Empty, self.done.get_nowait)
604
605   @_Repeat
606   def testMixedAcquireTimeout(self):
607     sync = threading.Event()
608
609     def _AcquireShared(ev):
610       if not self.sl.acquire(shared=1, timeout=None):
611         return
612
613       self.done.put("shared")
614
615       # Notify main thread
616       ev.set()
617
618       # Wait for notification from main thread
619       sync.wait()
620
621       # Release lock
622       self.sl.release()
623
624     acquires = []
625     for _ in range(3):
626       ev = threading.Event()
627       self._addThread(target=_AcquireShared, args=(ev, ))
628       acquires.append(ev)
629
630     # Wait for all acquires to finish
631     for i in acquires:
632       i.wait()
633
634     self.assertEqual(self.sl._count_pending(), 0)
635
636     # Try to get exclusive lock
637     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
638
639     # Acquire exclusive without timeout
640     exclsync = threading.Event()
641     exclev = threading.Event()
642
643     def _AcquireExclusive():
644       if not self.sl.acquire(shared=0):
645         return
646
647       self.done.put("exclusive")
648
649       # Notify main thread
650       exclev.set()
651
652       # Wait for notification from main thread
653       exclsync.wait()
654
655       self.sl.release()
656
657     self._addThread(target=_AcquireExclusive)
658
659     # Try to get exclusive lock
660     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
661
662     # Make all shared holders release their locks
663     sync.set()
664
665     # Wait for exclusive acquire to succeed
666     exclev.wait()
667
668     self.assertEqual(self.sl._count_pending(), 0)
669
670     # Try to get exclusive lock
671     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
672
673     def _AcquireSharedSimple():
674       if self.sl.acquire(shared=1, timeout=None):
675         self.done.put("shared2")
676         self.sl.release()
677
678     for _ in range(10):
679       self._addThread(target=_AcquireSharedSimple)
680
681     # Tell exclusive lock to release
682     exclsync.set()
683
684     # Wait for everything to finish
685     self._waitThreads()
686
687     self.assertEqual(self.sl._count_pending(), 0)
688
689     # Check sequence
690     for _ in range(3):
691       self.assertEqual(self.done.get_nowait(), "shared")
692
693     self.assertEqual(self.done.get_nowait(), "exclusive")
694
695     for _ in range(10):
696       self.assertEqual(self.done.get_nowait(), "shared2")
697
698     self.assertRaises(Queue.Empty, self.done.get_nowait)
699
700
701 class TestSharedLockInCondition(_ThreadedTestCase):
702   """SharedLock as a condition lock tests"""
703
704   def setUp(self):
705     _ThreadedTestCase.setUp(self)
706     self.sl = locking.SharedLock("TestSharedLockInCondition")
707     self.setCondition()
708
709   def setCondition(self):
710     self.cond = threading.Condition(self.sl)
711
712   def testKeepMode(self):
713     self.cond.acquire(shared=1)
714     self.assert_(self.sl._is_owned(shared=1))
715     self.cond.wait(0)
716     self.assert_(self.sl._is_owned(shared=1))
717     self.cond.release()
718     self.cond.acquire(shared=0)
719     self.assert_(self.sl._is_owned(shared=0))
720     self.cond.wait(0)
721     self.assert_(self.sl._is_owned(shared=0))
722     self.cond.release()
723
724
725 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
726   """SharedLock as a pipe condition lock tests"""
727
728   def setCondition(self):
729     self.cond = locking.PipeCondition(self.sl)
730
731
732 class TestSSynchronizedDecorator(_ThreadedTestCase):
733   """Shared Lock Synchronized decorator test"""
734
735   def setUp(self):
736     _ThreadedTestCase.setUp(self)
737
738   @locking.ssynchronized(_decoratorlock)
739   def _doItExclusive(self):
740     self.assert_(_decoratorlock._is_owned())
741     self.done.put('EXC')
742
743   @locking.ssynchronized(_decoratorlock, shared=1)
744   def _doItSharer(self):
745     self.assert_(_decoratorlock._is_owned(shared=1))
746     self.done.put('SHR')
747
748   def testDecoratedFunctions(self):
749     self._doItExclusive()
750     self.assertFalse(_decoratorlock._is_owned())
751     self._doItSharer()
752     self.assertFalse(_decoratorlock._is_owned())
753
754   def testSharersCanCoexist(self):
755     _decoratorlock.acquire(shared=1)
756     threading.Thread(target=self._doItSharer).start()
757     self.assert_(self.done.get(True, 1))
758     _decoratorlock.release()
759
760   @_Repeat
761   def testExclusiveBlocksExclusive(self):
762     _decoratorlock.acquire()
763     self._addThread(target=self._doItExclusive)
764     # give it a bit of time to check that it's not actually doing anything
765     self.assertRaises(Queue.Empty, self.done.get_nowait)
766     _decoratorlock.release()
767     self._waitThreads()
768     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
769
770   @_Repeat
771   def testExclusiveBlocksSharer(self):
772     _decoratorlock.acquire()
773     self._addThread(target=self._doItSharer)
774     self.assertRaises(Queue.Empty, self.done.get_nowait)
775     _decoratorlock.release()
776     self._waitThreads()
777     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
778
779   @_Repeat
780   def testSharerBlocksExclusive(self):
781     _decoratorlock.acquire(shared=1)
782     self._addThread(target=self._doItExclusive)
783     self.assertRaises(Queue.Empty, self.done.get_nowait)
784     _decoratorlock.release()
785     self._waitThreads()
786     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
787
788
789 class TestLockSet(_ThreadedTestCase):
790   """LockSet tests"""
791
792   def setUp(self):
793     _ThreadedTestCase.setUp(self)
794     self._setUpLS()
795
796   def _setUpLS(self):
797     """Helper to (re)initialize the lock set"""
798     self.resources = ['one', 'two', 'three']
799     self.ls = locking.LockSet(self.resources, "TestLockSet")
800
801   def testResources(self):
802     self.assertEquals(self.ls._names(), set(self.resources))
803     newls = locking.LockSet([], "TestLockSet.testResources")
804     self.assertEquals(newls._names(), set())
805
806   def testAcquireRelease(self):
807     self.assert_(self.ls.acquire('one'))
808     self.assertEquals(self.ls._list_owned(), set(['one']))
809     self.ls.release()
810     self.assertEquals(self.ls._list_owned(), set())
811     self.assertEquals(self.ls.acquire(['one']), set(['one']))
812     self.assertEquals(self.ls._list_owned(), set(['one']))
813     self.ls.release()
814     self.assertEquals(self.ls._list_owned(), set())
815     self.ls.acquire(['one', 'two', 'three'])
816     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
817     self.ls.release('one')
818     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
819     self.ls.release(['three'])
820     self.assertEquals(self.ls._list_owned(), set(['two']))
821     self.ls.release()
822     self.assertEquals(self.ls._list_owned(), set())
823     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
824     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
825     self.ls.release()
826     self.assertEquals(self.ls._list_owned(), set())
827
828   def testNoDoubleAcquire(self):
829     self.ls.acquire('one')
830     self.assertRaises(AssertionError, self.ls.acquire, 'one')
831     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
832     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
833     self.ls.release()
834     self.ls.acquire(['one', 'three'])
835     self.ls.release('one')
836     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
837     self.ls.release('three')
838
839   def testNoWrongRelease(self):
840     self.assertRaises(AssertionError, self.ls.release)
841     self.ls.acquire('one')
842     self.assertRaises(AssertionError, self.ls.release, 'two')
843
844   def testAddRemove(self):
845     self.ls.add('four')
846     self.assertEquals(self.ls._list_owned(), set())
847     self.assert_('four' in self.ls._names())
848     self.ls.add(['five', 'six', 'seven'], acquired=1)
849     self.assert_('five' in self.ls._names())
850     self.assert_('six' in self.ls._names())
851     self.assert_('seven' in self.ls._names())
852     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
853     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
854     self.assert_('five' not in self.ls._names())
855     self.assert_('six' not in self.ls._names())
856     self.assertEquals(self.ls._list_owned(), set(['seven']))
857     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
858     self.ls.remove('seven')
859     self.assert_('seven' not in self.ls._names())
860     self.assertEquals(self.ls._list_owned(), set([]))
861     self.ls.acquire(None, shared=1)
862     self.assertRaises(AssertionError, self.ls.add, 'eight')
863     self.ls.release()
864     self.ls.acquire(None)
865     self.ls.add('eight', acquired=1)
866     self.assert_('eight' in self.ls._names())
867     self.assert_('eight' in self.ls._list_owned())
868     self.ls.add('nine')
869     self.assert_('nine' in self.ls._names())
870     self.assert_('nine' not in self.ls._list_owned())
871     self.ls.release()
872     self.ls.remove(['two'])
873     self.assert_('two' not in self.ls._names())
874     self.ls.acquire('three')
875     self.assertEquals(self.ls.remove(['three']), ['three'])
876     self.assert_('three' not in self.ls._names())
877     self.assertEquals(self.ls.remove('three'), [])
878     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
879     self.assert_('one' not in self.ls._names())
880
881   def testRemoveNonBlocking(self):
882     self.ls.acquire('one')
883     self.assertEquals(self.ls.remove('one'), ['one'])
884     self.ls.acquire(['two', 'three'])
885     self.assertEquals(self.ls.remove(['two', 'three']),
886                       ['two', 'three'])
887
888   def testNoDoubleAdd(self):
889     self.assertRaises(errors.LockError, self.ls.add, 'two')
890     self.ls.add('four')
891     self.assertRaises(errors.LockError, self.ls.add, 'four')
892
893   def testNoWrongRemoves(self):
894     self.ls.acquire(['one', 'three'], shared=1)
895     # Cannot remove 'two' while holding something which is not a superset
896     self.assertRaises(AssertionError, self.ls.remove, 'two')
897     # Cannot remove 'three' as we are sharing it
898     self.assertRaises(AssertionError, self.ls.remove, 'three')
899
900   def testAcquireSetLock(self):
901     # acquire the set-lock exclusively
902     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
903     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
904     self.assertEquals(self.ls._is_owned(), True)
905     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
906     # I can still add/remove elements...
907     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
908     self.assert_(self.ls.add('six'))
909     self.ls.release()
910     # share the set-lock
911     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
912     # adding new elements is not possible
913     self.assertRaises(AssertionError, self.ls.add, 'five')
914     self.ls.release()
915
916   def testAcquireWithRepetitions(self):
917     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
918                       set(['two', 'two', 'three']))
919     self.ls.release(['two', 'two'])
920     self.assertEquals(self.ls._list_owned(), set(['three']))
921
922   def testEmptyAcquire(self):
923     # Acquire an empty list of locks...
924     self.assertEquals(self.ls.acquire([]), set())
925     self.assertEquals(self.ls._list_owned(), set())
926     # New locks can still be addded
927     self.assert_(self.ls.add('six'))
928     # "re-acquiring" is not an issue, since we had really acquired nothing
929     self.assertEquals(self.ls.acquire([], shared=1), set())
930     self.assertEquals(self.ls._list_owned(), set())
931     # We haven't really acquired anything, so we cannot release
932     self.assertRaises(AssertionError, self.ls.release)
933
934   def _doLockSet(self, names, shared):
935     try:
936       self.ls.acquire(names, shared=shared)
937       self.done.put('DONE')
938       self.ls.release()
939     except errors.LockError:
940       self.done.put('ERR')
941
942   def _doAddSet(self, names):
943     try:
944       self.ls.add(names, acquired=1)
945       self.done.put('DONE')
946       self.ls.release()
947     except errors.LockError:
948       self.done.put('ERR')
949
950   def _doRemoveSet(self, names):
951     self.done.put(self.ls.remove(names))
952
953   @_Repeat
954   def testConcurrentSharedAcquire(self):
955     self.ls.acquire(['one', 'two'], shared=1)
956     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
957     self._waitThreads()
958     self.assertEqual(self.done.get_nowait(), 'DONE')
959     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
960     self._waitThreads()
961     self.assertEqual(self.done.get_nowait(), 'DONE')
962     self._addThread(target=self._doLockSet, args=('three', 1))
963     self._waitThreads()
964     self.assertEqual(self.done.get_nowait(), 'DONE')
965     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
966     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
967     self.assertRaises(Queue.Empty, self.done.get_nowait)
968     self.ls.release()
969     self._waitThreads()
970     self.assertEqual(self.done.get_nowait(), 'DONE')
971     self.assertEqual(self.done.get_nowait(), 'DONE')
972
973   @_Repeat
974   def testConcurrentExclusiveAcquire(self):
975     self.ls.acquire(['one', 'two'])
976     self._addThread(target=self._doLockSet, args=('three', 1))
977     self._waitThreads()
978     self.assertEqual(self.done.get_nowait(), 'DONE')
979     self._addThread(target=self._doLockSet, args=('three', 0))
980     self._waitThreads()
981     self.assertEqual(self.done.get_nowait(), 'DONE')
982     self.assertRaises(Queue.Empty, self.done.get_nowait)
983     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
984     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
985     self._addThread(target=self._doLockSet, args=('one', 0))
986     self._addThread(target=self._doLockSet, args=('one', 1))
987     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
988     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
989     self.assertRaises(Queue.Empty, self.done.get_nowait)
990     self.ls.release()
991     self._waitThreads()
992     for _ in range(6):
993       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
994
995   @_Repeat
996   def testSimpleAcquireTimeoutExpiring(self):
997     names = sorted(self.ls._names())
998     self.assert_(len(names) >= 3)
999
1000     # Get name of first lock
1001     first = names[0]
1002
1003     # Get name of last lock
1004     last = names.pop()
1005
1006     checks = [
1007       # Block first and try to lock it again
1008       (first, first),
1009
1010       # Block last and try to lock all locks
1011       (None, first),
1012
1013       # Block last and try to lock it again
1014       (last, last),
1015       ]
1016
1017     for (wanted, block) in checks:
1018       # Lock in exclusive mode
1019       self.assert_(self.ls.acquire(block, shared=0))
1020
1021       def _AcquireOne():
1022         # Try to get the same lock again with a timeout (should never succeed)
1023         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1024         if acquired:
1025           self.done.put("acquired")
1026           self.ls.release()
1027         else:
1028           self.assert_(acquired is None)
1029           self.assertFalse(self.ls._list_owned())
1030           self.assertFalse(self.ls._is_owned())
1031           self.done.put("not acquired")
1032
1033       self._addThread(target=_AcquireOne)
1034
1035       # Wait for timeout in thread to expire
1036       self._waitThreads()
1037
1038       # Release exclusive lock again
1039       self.ls.release()
1040
1041       self.assertEqual(self.done.get_nowait(), "not acquired")
1042       self.assertRaises(Queue.Empty, self.done.get_nowait)
1043
1044   @_Repeat
1045   def testDelayedAndExpiringLockAcquire(self):
1046     self._setUpLS()
1047     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1048
1049     for expire in (False, True):
1050       names = sorted(self.ls._names())
1051       self.assertEqual(len(names), 8)
1052
1053       lock_ev = dict([(i, threading.Event()) for i in names])
1054
1055       # Lock all in exclusive mode
1056       self.assert_(self.ls.acquire(names, shared=0))
1057
1058       if expire:
1059         # We'll wait at least 300ms per lock
1060         lockwait = len(names) * [0.3]
1061
1062         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1063         # this gives us up to 2.4s to fail.
1064         lockall_timeout = 0.4
1065       else:
1066         # This should finish rather quickly
1067         lockwait = None
1068         lockall_timeout = len(names) * 5.0
1069
1070       def _LockAll():
1071         def acquire_notification(name):
1072           if not expire:
1073             self.done.put("getting %s" % name)
1074
1075           # Kick next lock
1076           lock_ev[name].set()
1077
1078         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1079                            test_notify=acquire_notification):
1080           self.done.put("got all")
1081           self.ls.release()
1082         else:
1083           self.done.put("timeout on all")
1084
1085         # Notify all locks
1086         for ev in lock_ev.values():
1087           ev.set()
1088
1089       t = self._addThread(target=_LockAll)
1090
1091       for idx, name in enumerate(names):
1092         # Wait for actual acquire on this lock to start
1093         lock_ev[name].wait(10.0)
1094
1095         if expire and t.isAlive():
1096           # Wait some time after getting the notification to make sure the lock
1097           # acquire will expire
1098           SafeSleep(lockwait[idx])
1099
1100         self.ls.release(names=name)
1101
1102       self.assertFalse(self.ls._list_owned())
1103
1104       self._waitThreads()
1105
1106       if expire:
1107         # Not checking which locks were actually acquired. Doing so would be
1108         # too timing-dependant.
1109         self.assertEqual(self.done.get_nowait(), "timeout on all")
1110       else:
1111         for i in names:
1112           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1113         self.assertEqual(self.done.get_nowait(), "got all")
1114       self.assertRaises(Queue.Empty, self.done.get_nowait)
1115
1116   @_Repeat
1117   def testConcurrentRemove(self):
1118     self.ls.add('four')
1119     self.ls.acquire(['one', 'two', 'four'])
1120     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1121     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1122     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1123     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1124     self.assertRaises(Queue.Empty, self.done.get_nowait)
1125     self.ls.remove('one')
1126     self.ls.release()
1127     self._waitThreads()
1128     for i in range(4):
1129       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1130     self.ls.add(['five', 'six'], acquired=1)
1131     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1132     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1133     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1134     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1135     self.ls.remove('five')
1136     self.ls.release()
1137     self._waitThreads()
1138     for i in range(4):
1139       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1140     self.ls.acquire(['three', 'four'])
1141     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1142     self.assertRaises(Queue.Empty, self.done.get_nowait)
1143     self.ls.remove('four')
1144     self._waitThreads()
1145     self.assertEqual(self.done.get_nowait(), ['six'])
1146     self._addThread(target=self._doRemoveSet, args=(['two']))
1147     self._waitThreads()
1148     self.assertEqual(self.done.get_nowait(), ['two'])
1149     self.ls.release()
1150     # reset lockset
1151     self._setUpLS()
1152
1153   @_Repeat
1154   def testConcurrentSharedSetLock(self):
1155     # share the set-lock...
1156     self.ls.acquire(None, shared=1)
1157     # ...another thread can share it too
1158     self._addThread(target=self._doLockSet, args=(None, 1))
1159     self._waitThreads()
1160     self.assertEqual(self.done.get_nowait(), 'DONE')
1161     # ...or just share some elements
1162     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1163     self._waitThreads()
1164     self.assertEqual(self.done.get_nowait(), 'DONE')
1165     # ...but not add new ones or remove any
1166     t = self._addThread(target=self._doAddSet, args=(['nine']))
1167     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1168     self.assertRaises(Queue.Empty, self.done.get_nowait)
1169     # this just releases the set-lock
1170     self.ls.release([])
1171     t.join(60)
1172     self.assertEqual(self.done.get_nowait(), 'DONE')
1173     # release the lock on the actual elements so remove() can proceed too
1174     self.ls.release()
1175     self._waitThreads()
1176     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1177     # reset lockset
1178     self._setUpLS()
1179
1180   @_Repeat
1181   def testConcurrentExclusiveSetLock(self):
1182     # acquire the set-lock...
1183     self.ls.acquire(None, shared=0)
1184     # ...no one can do anything else
1185     self._addThread(target=self._doLockSet, args=(None, 1))
1186     self._addThread(target=self._doLockSet, args=(None, 0))
1187     self._addThread(target=self._doLockSet, args=(['three'], 0))
1188     self._addThread(target=self._doLockSet, args=(['two'], 1))
1189     self._addThread(target=self._doAddSet, args=(['nine']))
1190     self.assertRaises(Queue.Empty, self.done.get_nowait)
1191     self.ls.release()
1192     self._waitThreads()
1193     for _ in range(5):
1194       self.assertEqual(self.done.get(True, 1), 'DONE')
1195     # cleanup
1196     self._setUpLS()
1197
1198   @_Repeat
1199   def testConcurrentSetLockAdd(self):
1200     self.ls.acquire('one')
1201     # Another thread wants the whole SetLock
1202     self._addThread(target=self._doLockSet, args=(None, 0))
1203     self._addThread(target=self._doLockSet, args=(None, 1))
1204     self.assertRaises(Queue.Empty, self.done.get_nowait)
1205     self.assertRaises(AssertionError, self.ls.add, 'four')
1206     self.ls.release()
1207     self._waitThreads()
1208     self.assertEqual(self.done.get_nowait(), 'DONE')
1209     self.assertEqual(self.done.get_nowait(), 'DONE')
1210     self.ls.acquire(None)
1211     self._addThread(target=self._doLockSet, args=(None, 0))
1212     self._addThread(target=self._doLockSet, args=(None, 1))
1213     self.assertRaises(Queue.Empty, self.done.get_nowait)
1214     self.ls.add('four')
1215     self.ls.add('five', acquired=1)
1216     self.ls.add('six', acquired=1, shared=1)
1217     self.assertEquals(self.ls._list_owned(),
1218       set(['one', 'two', 'three', 'five', 'six']))
1219     self.assertEquals(self.ls._is_owned(), True)
1220     self.assertEquals(self.ls._names(),
1221       set(['one', 'two', 'three', 'four', 'five', 'six']))
1222     self.ls.release()
1223     self._waitThreads()
1224     self.assertEqual(self.done.get_nowait(), 'DONE')
1225     self.assertEqual(self.done.get_nowait(), 'DONE')
1226     self._setUpLS()
1227
1228   @_Repeat
1229   def testEmptyLockSet(self):
1230     # get the set-lock
1231     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1232     # now empty it...
1233     self.ls.remove(['one', 'two', 'three'])
1234     # and adds/locks by another thread still wait
1235     self._addThread(target=self._doAddSet, args=(['nine']))
1236     self._addThread(target=self._doLockSet, args=(None, 1))
1237     self._addThread(target=self._doLockSet, args=(None, 0))
1238     self.assertRaises(Queue.Empty, self.done.get_nowait)
1239     self.ls.release()
1240     self._waitThreads()
1241     for _ in range(3):
1242       self.assertEqual(self.done.get_nowait(), 'DONE')
1243     # empty it again...
1244     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1245     # now share it...
1246     self.assertEqual(self.ls.acquire(None, shared=1), set())
1247     # other sharers can go, adds still wait
1248     self._addThread(target=self._doLockSet, args=(None, 1))
1249     self._waitThreads()
1250     self.assertEqual(self.done.get_nowait(), 'DONE')
1251     self._addThread(target=self._doAddSet, args=(['nine']))
1252     self.assertRaises(Queue.Empty, self.done.get_nowait)
1253     self.ls.release()
1254     self._waitThreads()
1255     self.assertEqual(self.done.get_nowait(), 'DONE')
1256     self._setUpLS()
1257
1258
1259 class TestGanetiLockManager(_ThreadedTestCase):
1260
1261   def setUp(self):
1262     _ThreadedTestCase.setUp(self)
1263     self.nodes=['n1', 'n2']
1264     self.instances=['i1', 'i2', 'i3']
1265     self.GL = locking.GanetiLockManager(nodes=self.nodes,
1266                                         instances=self.instances)
1267
1268   def tearDown(self):
1269     # Don't try this at home...
1270     locking.GanetiLockManager._instance = None
1271
1272   def testLockingConstants(self):
1273     # The locking library internally cheats by assuming its constants have some
1274     # relationships with each other. Check those hold true.
1275     # This relationship is also used in the Processor to recursively acquire
1276     # the right locks. Again, please don't break it.
1277     for i in range(len(locking.LEVELS)):
1278       self.assertEqual(i, locking.LEVELS[i])
1279
1280   def testDoubleGLFails(self):
1281     self.assertRaises(AssertionError, locking.GanetiLockManager)
1282
1283   def testLockNames(self):
1284     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1285     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1286     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1287                      set(self.instances))
1288
1289   def testInitAndResources(self):
1290     locking.GanetiLockManager._instance = None
1291     self.GL = locking.GanetiLockManager([], [])
1292     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1293     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1294     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1295
1296     locking.GanetiLockManager._instance = None
1297     self.GL = locking.GanetiLockManager(self.nodes, [])
1298     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1299     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1300     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1301
1302     locking.GanetiLockManager._instance = None
1303     self.GL = locking.GanetiLockManager([], self.instances)
1304     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1305     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1306     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1307                      set(self.instances))
1308
1309   def testAcquireRelease(self):
1310     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1311     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1312     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1313     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1314     self.GL.release(locking.LEVEL_NODE, ['n2'])
1315     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1316     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1317     self.GL.release(locking.LEVEL_NODE)
1318     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1319     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1320     self.GL.release(locking.LEVEL_INSTANCE)
1321     self.assertRaises(errors.LockError, self.GL.acquire,
1322                       locking.LEVEL_INSTANCE, ['i5'])
1323     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1324     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1325
1326   def testAcquireWholeSets(self):
1327     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1328     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1329                       set(self.instances))
1330     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1331                       set(self.instances))
1332     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1333                       set(self.nodes))
1334     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1335                       set(self.nodes))
1336     self.GL.release(locking.LEVEL_NODE)
1337     self.GL.release(locking.LEVEL_INSTANCE)
1338     self.GL.release(locking.LEVEL_CLUSTER)
1339
1340   def testAcquireWholeAndPartial(self):
1341     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1342     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1343                       set(self.instances))
1344     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1345                       set(self.instances))
1346     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1347                       set(['n2']))
1348     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1349                       set(['n2']))
1350     self.GL.release(locking.LEVEL_NODE)
1351     self.GL.release(locking.LEVEL_INSTANCE)
1352     self.GL.release(locking.LEVEL_CLUSTER)
1353
1354   def testBGLDependency(self):
1355     self.assertRaises(AssertionError, self.GL.acquire,
1356                       locking.LEVEL_NODE, ['n1', 'n2'])
1357     self.assertRaises(AssertionError, self.GL.acquire,
1358                       locking.LEVEL_INSTANCE, ['i3'])
1359     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1360     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1361     self.assertRaises(AssertionError, self.GL.release,
1362                       locking.LEVEL_CLUSTER, ['BGL'])
1363     self.assertRaises(AssertionError, self.GL.release,
1364                       locking.LEVEL_CLUSTER)
1365     self.GL.release(locking.LEVEL_NODE)
1366     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1367     self.assertRaises(AssertionError, self.GL.release,
1368                       locking.LEVEL_CLUSTER, ['BGL'])
1369     self.assertRaises(AssertionError, self.GL.release,
1370                       locking.LEVEL_CLUSTER)
1371     self.GL.release(locking.LEVEL_INSTANCE)
1372
1373   def testWrongOrder(self):
1374     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1375     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1376     self.assertRaises(AssertionError, self.GL.acquire,
1377                       locking.LEVEL_NODE, ['n1'])
1378     self.assertRaises(AssertionError, self.GL.acquire,
1379                       locking.LEVEL_INSTANCE, ['i2'])
1380
1381   # Helper function to run as a thread that shared the BGL and then acquires
1382   # some locks at another level.
1383   def _doLock(self, level, names, shared):
1384     try:
1385       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1386       self.GL.acquire(level, names, shared=shared)
1387       self.done.put('DONE')
1388       self.GL.release(level)
1389       self.GL.release(locking.LEVEL_CLUSTER)
1390     except errors.LockError:
1391       self.done.put('ERR')
1392
1393   @_Repeat
1394   def testConcurrency(self):
1395     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1396     self._addThread(target=self._doLock,
1397                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1398     self._waitThreads()
1399     self.assertEqual(self.done.get_nowait(), 'DONE')
1400     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1401     self._addThread(target=self._doLock,
1402                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1403     self._waitThreads()
1404     self.assertEqual(self.done.get_nowait(), 'DONE')
1405     self._addThread(target=self._doLock,
1406                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1407     self.assertRaises(Queue.Empty, self.done.get_nowait)
1408     self.GL.release(locking.LEVEL_INSTANCE)
1409     self._waitThreads()
1410     self.assertEqual(self.done.get_nowait(), 'DONE')
1411     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1412     self._addThread(target=self._doLock,
1413                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1414     self._waitThreads()
1415     self.assertEqual(self.done.get_nowait(), 'DONE')
1416     self._addThread(target=self._doLock,
1417                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1418     self.assertRaises(Queue.Empty, self.done.get_nowait)
1419     self.GL.release(locking.LEVEL_INSTANCE)
1420     self._waitThreads()
1421     self.assertEqual(self.done.get(True, 1), 'DONE')
1422     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1423
1424
1425 if __name__ == '__main__':
1426   testutils.GanetiTestProgram()