bash_completion: Enable extglob while parsing file
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31 import gc
32 import itertools
33
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
41
42 import testutils
43
44
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
48
49 #: List for looping tests
50 ITERATIONS = range(8)
51
52
53 def _Repeat(fn):
54   """Decorator for executing a function many times"""
55   def wrapper(*args, **kwargs):
56     for i in ITERATIONS:
57       fn(*args, **kwargs)
58   return wrapper
59
60
61 def SafeSleep(duration):
62   start = time.time()
63   while True:
64     delay = start + duration - time.time()
65     if delay <= 0.0:
66       break
67     time.sleep(delay)
68
69
70 class _ThreadedTestCase(unittest.TestCase):
71   """Test class that supports adding/waiting on threads"""
72   def setUp(self):
73     unittest.TestCase.setUp(self)
74     self.done = Queue.Queue(0)
75     self.threads = []
76
77   def _addThread(self, *args, **kwargs):
78     """Create and remember a new thread"""
79     t = threading.Thread(*args, **kwargs)
80     self.threads.append(t)
81     t.start()
82     return t
83
84   def _waitThreads(self):
85     """Wait for all our threads to finish"""
86     for t in self.threads:
87       t.join(60)
88       self.failIf(t.isAlive())
89     self.threads = []
90
91
92 class _ConditionTestCase(_ThreadedTestCase):
93   """Common test case for conditions"""
94
95   def setUp(self, cls):
96     _ThreadedTestCase.setUp(self)
97     self.lock = threading.Lock()
98     self.cond = cls(self.lock)
99
100   def _testAcquireRelease(self):
101     self.assertFalse(self.cond._is_owned())
102     self.assertRaises(RuntimeError, self.cond.wait, None)
103     self.assertRaises(RuntimeError, self.cond.notifyAll)
104
105     self.cond.acquire()
106     self.assert_(self.cond._is_owned())
107     self.cond.notifyAll()
108     self.assert_(self.cond._is_owned())
109     self.cond.release()
110
111     self.assertFalse(self.cond._is_owned())
112     self.assertRaises(RuntimeError, self.cond.wait, None)
113     self.assertRaises(RuntimeError, self.cond.notifyAll)
114
115   def _testNotification(self):
116     def _NotifyAll():
117       self.done.put("NE")
118       self.cond.acquire()
119       self.done.put("NA")
120       self.cond.notifyAll()
121       self.done.put("NN")
122       self.cond.release()
123
124     self.cond.acquire()
125     self._addThread(target=_NotifyAll)
126     self.assertEqual(self.done.get(True, 1), "NE")
127     self.assertRaises(Queue.Empty, self.done.get_nowait)
128     self.cond.wait(None)
129     self.assertEqual(self.done.get(True, 1), "NA")
130     self.assertEqual(self.done.get(True, 1), "NN")
131     self.assert_(self.cond._is_owned())
132     self.cond.release()
133     self.assertFalse(self.cond._is_owned())
134
135
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137   """SingleNotifyPipeCondition tests"""
138
139   def setUp(self):
140     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141
142   def testAcquireRelease(self):
143     self._testAcquireRelease()
144
145   def testNotification(self):
146     self._testNotification()
147
148   def testWaitReuse(self):
149     self.cond.acquire()
150     self.cond.wait(0)
151     self.cond.wait(0.1)
152     self.cond.release()
153
154   def testNoNotifyReuse(self):
155     self.cond.acquire()
156     self.cond.notifyAll()
157     self.assertRaises(RuntimeError, self.cond.wait, None)
158     self.assertRaises(RuntimeError, self.cond.notifyAll)
159     self.cond.release()
160
161
162 class TestPipeCondition(_ConditionTestCase):
163   """PipeCondition tests"""
164
165   def setUp(self):
166     _ConditionTestCase.setUp(self, locking.PipeCondition)
167
168   def testAcquireRelease(self):
169     self._testAcquireRelease()
170
171   def testNotification(self):
172     self._testNotification()
173
174   def _TestWait(self, fn):
175     threads = [
176       self._addThread(target=fn),
177       self._addThread(target=fn),
178       self._addThread(target=fn),
179       ]
180
181     # Wait for threads to be waiting
182     for _ in threads:
183       self.assertEqual(self.done.get(True, 1), "A")
184
185     self.assertRaises(Queue.Empty, self.done.get_nowait)
186
187     self.cond.acquire()
188     self.assertEqual(len(self.cond._waiters), 3)
189     self.assertEqual(self.cond._waiters, set(threads))
190     # This new thread can't acquire the lock, and thus call wait, before we
191     # release it
192     self._addThread(target=fn)
193     self.cond.notifyAll()
194     self.assertRaises(Queue.Empty, self.done.get_nowait)
195     self.cond.release()
196
197     # We should now get 3 W and 1 A (for the new thread) in whatever order
198     w = 0
199     a = 0
200     for i in range(4):
201       got = self.done.get(True, 1)
202       if got == "W":
203         w += 1
204       elif got == "A":
205         a += 1
206       else:
207         self.fail("Got %s on the done queue" % got)
208
209     self.assertEqual(w, 3)
210     self.assertEqual(a, 1)
211
212     self.cond.acquire()
213     self.cond.notifyAll()
214     self.cond.release()
215     self._waitThreads()
216     self.assertEqual(self.done.get_nowait(), "W")
217     self.assertRaises(Queue.Empty, self.done.get_nowait)
218
219   def testBlockingWait(self):
220     def _BlockingWait():
221       self.cond.acquire()
222       self.done.put("A")
223       self.cond.wait(None)
224       self.cond.release()
225       self.done.put("W")
226
227     self._TestWait(_BlockingWait)
228
229   def testLongTimeoutWait(self):
230     def _Helper():
231       self.cond.acquire()
232       self.done.put("A")
233       self.cond.wait(15.0)
234       self.cond.release()
235       self.done.put("W")
236
237     self._TestWait(_Helper)
238
239   def _TimeoutWait(self, timeout, check):
240     self.cond.acquire()
241     self.cond.wait(timeout)
242     self.cond.release()
243     self.done.put(check)
244
245   def testShortTimeoutWait(self):
246     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248     self._waitThreads()
249     self.assertEqual(self.done.get_nowait(), "T1")
250     self.assertEqual(self.done.get_nowait(), "T1")
251     self.assertRaises(Queue.Empty, self.done.get_nowait)
252
253   def testZeroTimeoutWait(self):
254     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257     self._waitThreads()
258     self.assertEqual(self.done.get_nowait(), "T0")
259     self.assertEqual(self.done.get_nowait(), "T0")
260     self.assertEqual(self.done.get_nowait(), "T0")
261     self.assertRaises(Queue.Empty, self.done.get_nowait)
262
263
264 class TestSharedLock(_ThreadedTestCase):
265   """SharedLock tests"""
266
267   def setUp(self):
268     _ThreadedTestCase.setUp(self)
269     self.sl = locking.SharedLock("TestSharedLock")
270
271   def testSequenceAndOwnership(self):
272     self.assertFalse(self.sl.is_owned())
273     self.sl.acquire(shared=1)
274     self.assert_(self.sl.is_owned())
275     self.assert_(self.sl.is_owned(shared=1))
276     self.assertFalse(self.sl.is_owned(shared=0))
277     self.sl.release()
278     self.assertFalse(self.sl.is_owned())
279     self.sl.acquire()
280     self.assert_(self.sl.is_owned())
281     self.assertFalse(self.sl.is_owned(shared=1))
282     self.assert_(self.sl.is_owned(shared=0))
283     self.sl.release()
284     self.assertFalse(self.sl.is_owned())
285     self.sl.acquire(shared=1)
286     self.assert_(self.sl.is_owned())
287     self.assert_(self.sl.is_owned(shared=1))
288     self.assertFalse(self.sl.is_owned(shared=0))
289     self.sl.release()
290     self.assertFalse(self.sl.is_owned())
291
292   def testBooleanValue(self):
293     # semaphores are supposed to return a true value on a successful acquire
294     self.assert_(self.sl.acquire(shared=1))
295     self.sl.release()
296     self.assert_(self.sl.acquire())
297     self.sl.release()
298
299   def testDoubleLockingStoE(self):
300     self.sl.acquire(shared=1)
301     self.assertRaises(AssertionError, self.sl.acquire)
302
303   def testDoubleLockingEtoS(self):
304     self.sl.acquire()
305     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306
307   def testDoubleLockingStoS(self):
308     self.sl.acquire(shared=1)
309     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310
311   def testDoubleLockingEtoE(self):
312     self.sl.acquire()
313     self.assertRaises(AssertionError, self.sl.acquire)
314
315   # helper functions: called in a separate thread they acquire the lock, send
316   # their identifier on the done queue, then release it.
317   def _doItSharer(self):
318     try:
319       self.sl.acquire(shared=1)
320       self.done.put('SHR')
321       self.sl.release()
322     except errors.LockError:
323       self.done.put('ERR')
324
325   def _doItExclusive(self):
326     try:
327       self.sl.acquire()
328       self.done.put('EXC')
329       self.sl.release()
330     except errors.LockError:
331       self.done.put('ERR')
332
333   def _doItDelete(self):
334     try:
335       self.sl.delete()
336       self.done.put('DEL')
337     except errors.LockError:
338       self.done.put('ERR')
339
340   def testSharersCanCoexist(self):
341     self.sl.acquire(shared=1)
342     threading.Thread(target=self._doItSharer).start()
343     self.assert_(self.done.get(True, 1))
344     self.sl.release()
345
346   @_Repeat
347   def testExclusiveBlocksExclusive(self):
348     self.sl.acquire()
349     self._addThread(target=self._doItExclusive)
350     self.assertRaises(Queue.Empty, self.done.get_nowait)
351     self.sl.release()
352     self._waitThreads()
353     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354
355   @_Repeat
356   def testExclusiveBlocksDelete(self):
357     self.sl.acquire()
358     self._addThread(target=self._doItDelete)
359     self.assertRaises(Queue.Empty, self.done.get_nowait)
360     self.sl.release()
361     self._waitThreads()
362     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363     self.sl = locking.SharedLock(self.sl.name)
364
365   @_Repeat
366   def testExclusiveBlocksSharer(self):
367     self.sl.acquire()
368     self._addThread(target=self._doItSharer)
369     self.assertRaises(Queue.Empty, self.done.get_nowait)
370     self.sl.release()
371     self._waitThreads()
372     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373
374   @_Repeat
375   def testSharerBlocksExclusive(self):
376     self.sl.acquire(shared=1)
377     self._addThread(target=self._doItExclusive)
378     self.assertRaises(Queue.Empty, self.done.get_nowait)
379     self.sl.release()
380     self._waitThreads()
381     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382
383   @_Repeat
384   def testSharerBlocksDelete(self):
385     self.sl.acquire(shared=1)
386     self._addThread(target=self._doItDelete)
387     self.assertRaises(Queue.Empty, self.done.get_nowait)
388     self.sl.release()
389     self._waitThreads()
390     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391     self.sl = locking.SharedLock(self.sl.name)
392
393   @_Repeat
394   def testWaitingExclusiveBlocksSharer(self):
395     """SKIPPED testWaitingExclusiveBlockSharer"""
396     return
397
398     self.sl.acquire(shared=1)
399     # the lock is acquired in shared mode...
400     self._addThread(target=self._doItExclusive)
401     # ...but now an exclusive is waiting...
402     self._addThread(target=self._doItSharer)
403     # ...so the sharer should be blocked as well
404     self.assertRaises(Queue.Empty, self.done.get_nowait)
405     self.sl.release()
406     self._waitThreads()
407     # The exclusive passed before
408     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410
411   @_Repeat
412   def testWaitingSharerBlocksExclusive(self):
413     """SKIPPED testWaitingSharerBlocksExclusive"""
414     return
415
416     self.sl.acquire()
417     # the lock is acquired in exclusive mode...
418     self._addThread(target=self._doItSharer)
419     # ...but now a sharer is waiting...
420     self._addThread(target=self._doItExclusive)
421     # ...the exclusive is waiting too...
422     self.assertRaises(Queue.Empty, self.done.get_nowait)
423     self.sl.release()
424     self._waitThreads()
425     # The sharer passed before
426     self.assertEqual(self.done.get_nowait(), 'SHR')
427     self.assertEqual(self.done.get_nowait(), 'EXC')
428
429   def testDelete(self):
430     self.sl.delete()
431     self.assertRaises(errors.LockError, self.sl.acquire)
432     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433     self.assertRaises(errors.LockError, self.sl.delete)
434
435   def testDeleteTimeout(self):
436     self.assertTrue(self.sl.delete(timeout=60))
437
438   def testDeleteTimeoutFail(self):
439     ready = threading.Event()
440     finish = threading.Event()
441
442     def fn():
443       self.sl.acquire(shared=0)
444       ready.set()
445
446       finish.wait()
447       self.sl.release()
448
449     self._addThread(target=fn)
450     ready.wait()
451
452     # Test if deleting a lock owned in exclusive mode by another thread fails
453     # to delete when a timeout is used
454     self.assertFalse(self.sl.delete(timeout=0.02))
455
456     finish.set()
457     self._waitThreads()
458
459     self.assertTrue(self.sl.delete())
460     self.assertRaises(errors.LockError, self.sl.acquire)
461
462   def testNoDeleteIfSharer(self):
463     self.sl.acquire(shared=1)
464     self.assertRaises(AssertionError, self.sl.delete)
465
466   @_Repeat
467   def testDeletePendingSharersExclusiveDelete(self):
468     self.sl.acquire()
469     self._addThread(target=self._doItSharer)
470     self._addThread(target=self._doItSharer)
471     self._addThread(target=self._doItExclusive)
472     self._addThread(target=self._doItDelete)
473     self.sl.delete()
474     self._waitThreads()
475     # The threads who were pending return ERR
476     for _ in range(4):
477       self.assertEqual(self.done.get_nowait(), 'ERR')
478     self.sl = locking.SharedLock(self.sl.name)
479
480   @_Repeat
481   def testDeletePendingDeleteExclusiveSharers(self):
482     self.sl.acquire()
483     self._addThread(target=self._doItDelete)
484     self._addThread(target=self._doItExclusive)
485     self._addThread(target=self._doItSharer)
486     self._addThread(target=self._doItSharer)
487     self.sl.delete()
488     self._waitThreads()
489     # The two threads who were pending return both ERR
490     self.assertEqual(self.done.get_nowait(), 'ERR')
491     self.assertEqual(self.done.get_nowait(), 'ERR')
492     self.assertEqual(self.done.get_nowait(), 'ERR')
493     self.assertEqual(self.done.get_nowait(), 'ERR')
494     self.sl = locking.SharedLock(self.sl.name)
495
496   @_Repeat
497   def testExclusiveAcquireTimeout(self):
498     for shared in [0, 1]:
499       on_queue = threading.Event()
500       release_exclusive = threading.Event()
501
502       def _LockExclusive():
503         self.sl.acquire(shared=0, test_notify=on_queue.set)
504         self.done.put("A: start wait")
505         release_exclusive.wait()
506         self.done.put("A: end wait")
507         self.sl.release()
508
509       # Start thread to hold lock in exclusive mode
510       self._addThread(target=_LockExclusive)
511
512       # Wait for wait to begin
513       self.assertEqual(self.done.get(timeout=60), "A: start wait")
514
515       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
516       # on the queue
517       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
518                                       test_notify=release_exclusive.set))
519
520       self.done.put("got 2nd")
521       self.sl.release()
522
523       self._waitThreads()
524
525       self.assertEqual(self.done.get_nowait(), "A: end wait")
526       self.assertEqual(self.done.get_nowait(), "got 2nd")
527       self.assertRaises(Queue.Empty, self.done.get_nowait)
528
529   @_Repeat
530   def testAcquireExpiringTimeout(self):
531     def _AcquireWithTimeout(shared, timeout):
532       if not self.sl.acquire(shared=shared, timeout=timeout):
533         self.done.put("timeout")
534
535     for shared in [0, 1]:
536       # Lock exclusively
537       self.sl.acquire()
538
539       # Start shared acquires with timeout between 0 and 20 ms
540       for i in range(11):
541         self._addThread(target=_AcquireWithTimeout,
542                         args=(shared, i * 2.0 / 1000.0))
543
544       # Wait for threads to finish (makes sure the acquire timeout expires
545       # before releasing the lock)
546       self._waitThreads()
547
548       # Release lock
549       self.sl.release()
550
551       for _ in range(11):
552         self.assertEqual(self.done.get_nowait(), "timeout")
553
554       self.assertRaises(Queue.Empty, self.done.get_nowait)
555
556   @_Repeat
557   def testSharedSkipExclusiveAcquires(self):
558     # Tests whether shared acquires jump in front of exclusive acquires in the
559     # queue.
560
561     def _Acquire(shared, name, notify_ev, wait_ev):
562       if notify_ev:
563         notify_fn = notify_ev.set
564       else:
565         notify_fn = None
566
567       if wait_ev:
568         wait_ev.wait()
569
570       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
571         return
572
573       self.done.put(name)
574       self.sl.release()
575
576     # Get exclusive lock while we fill the queue
577     self.sl.acquire()
578
579     shrcnt1 = 5
580     shrcnt2 = 7
581     shrcnt3 = 9
582     shrcnt4 = 2
583
584     # Add acquires using threading.Event for synchronization. They'll be
585     # acquired exactly in the order defined in this list.
586     acquires = (shrcnt1 * [(1, "shared 1")] +
587                 3 * [(0, "exclusive 1")] +
588                 shrcnt2 * [(1, "shared 2")] +
589                 shrcnt3 * [(1, "shared 3")] +
590                 shrcnt4 * [(1, "shared 4")] +
591                 3 * [(0, "exclusive 2")])
592
593     ev_cur = None
594     ev_prev = None
595
596     for args in acquires:
597       ev_cur = threading.Event()
598       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
599       ev_prev = ev_cur
600
601     # Wait for last acquire to start
602     ev_prev.wait()
603
604     # Expect 6 pending exclusive acquires and 1 for all shared acquires
605     # together
606     self.assertEqual(self.sl._count_pending(), 7)
607
608     # Release exclusive lock and wait
609     self.sl.release()
610
611     self._waitThreads()
612
613     # Check sequence
614     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
615       # Shared locks aren't guaranteed to be notified in order, but they'll be
616       # first
617       tmp = self.done.get_nowait()
618       if tmp == "shared 1":
619         shrcnt1 -= 1
620       elif tmp == "shared 2":
621         shrcnt2 -= 1
622       elif tmp == "shared 3":
623         shrcnt3 -= 1
624       elif tmp == "shared 4":
625         shrcnt4 -= 1
626     self.assertEqual(shrcnt1, 0)
627     self.assertEqual(shrcnt2, 0)
628     self.assertEqual(shrcnt3, 0)
629     self.assertEqual(shrcnt3, 0)
630
631     for _ in range(3):
632       self.assertEqual(self.done.get_nowait(), "exclusive 1")
633
634     for _ in range(3):
635       self.assertEqual(self.done.get_nowait(), "exclusive 2")
636
637     self.assertRaises(Queue.Empty, self.done.get_nowait)
638
639   def testIllegalDowngrade(self):
640     # Not yet acquired
641     self.assertRaises(AssertionError, self.sl.downgrade)
642
643     # Acquire in shared mode, downgrade should be no-op
644     self.assertTrue(self.sl.acquire(shared=1))
645     self.assertTrue(self.sl.is_owned(shared=1))
646     self.assertTrue(self.sl.downgrade())
647     self.assertTrue(self.sl.is_owned(shared=1))
648     self.sl.release()
649
650   def testDowngrade(self):
651     self.assertTrue(self.sl.acquire())
652     self.assertTrue(self.sl.is_owned(shared=0))
653     self.assertTrue(self.sl.downgrade())
654     self.assertTrue(self.sl.is_owned(shared=1))
655     self.sl.release()
656
657   @_Repeat
658   def testDowngradeJumpsAheadOfExclusive(self):
659     def _KeepExclusive(ev_got, ev_downgrade, ev_release):
660       self.assertTrue(self.sl.acquire())
661       self.assertTrue(self.sl.is_owned(shared=0))
662       ev_got.set()
663       ev_downgrade.wait()
664       self.assertTrue(self.sl.is_owned(shared=0))
665       self.assertTrue(self.sl.downgrade())
666       self.assertTrue(self.sl.is_owned(shared=1))
667       ev_release.wait()
668       self.assertTrue(self.sl.is_owned(shared=1))
669       self.sl.release()
670
671     def _KeepExclusive2(ev_started, ev_release):
672       self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
673       self.assertTrue(self.sl.is_owned(shared=0))
674       ev_release.wait()
675       self.assertTrue(self.sl.is_owned(shared=0))
676       self.sl.release()
677
678     def _KeepShared(ev_started, ev_got, ev_release):
679       self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
680       self.assertTrue(self.sl.is_owned(shared=1))
681       ev_got.set()
682       ev_release.wait()
683       self.assertTrue(self.sl.is_owned(shared=1))
684       self.sl.release()
685
686     # Acquire lock in exclusive mode
687     ev_got_excl1 = threading.Event()
688     ev_downgrade_excl1 = threading.Event()
689     ev_release_excl1 = threading.Event()
690     th_excl1 = self._addThread(target=_KeepExclusive,
691                                args=(ev_got_excl1, ev_downgrade_excl1,
692                                      ev_release_excl1))
693     ev_got_excl1.wait()
694
695     # Start a second exclusive acquire
696     ev_started_excl2 = threading.Event()
697     ev_release_excl2 = threading.Event()
698     th_excl2 = self._addThread(target=_KeepExclusive2,
699                                args=(ev_started_excl2, ev_release_excl2))
700     ev_started_excl2.wait()
701
702     # Start shared acquires, will jump ahead of second exclusive acquire when
703     # first exclusive acquire downgrades
704     ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
705     ev_release_shared = threading.Event()
706
707     th_shared = [self._addThread(target=_KeepShared,
708                                  args=(ev_started, ev_got, ev_release_shared))
709                  for (ev_started, ev_got) in ev_shared]
710
711     # Wait for all shared acquires to start
712     for (ev, _) in ev_shared:
713       ev.wait()
714
715     # Check lock information
716     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
717                      [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
718     [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
719     self.assertEqual([(pendmode, sorted(waiting))
720                       for (pendmode, waiting) in pending],
721                      [("exclusive", [th_excl2.getName()]),
722                       ("shared", sorted(th.getName() for th in th_shared))])
723
724     # Shared acquires won't start until the exclusive lock is downgraded
725     ev_downgrade_excl1.set()
726
727     # Wait for all shared acquires to be successful
728     for (_, ev) in ev_shared:
729       ev.wait()
730
731     # Check lock information again
732     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
733                                               query.LQ_PENDING])),
734                      [(self.sl.name, "shared", None,
735                        [("exclusive", [th_excl2.getName()])])])
736     [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
737     self.assertEqual(set(owner), set([th_excl1.getName()] +
738                                      [th.getName() for th in th_shared]))
739
740     ev_release_excl1.set()
741     ev_release_excl2.set()
742     ev_release_shared.set()
743
744     self._waitThreads()
745
746     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
747                                               query.LQ_PENDING])),
748                      [(self.sl.name, None, None, [])])
749
750   @_Repeat
751   def testMixedAcquireTimeout(self):
752     sync = threading.Event()
753
754     def _AcquireShared(ev):
755       if not self.sl.acquire(shared=1, timeout=None):
756         return
757
758       self.done.put("shared")
759
760       # Notify main thread
761       ev.set()
762
763       # Wait for notification from main thread
764       sync.wait()
765
766       # Release lock
767       self.sl.release()
768
769     acquires = []
770     for _ in range(3):
771       ev = threading.Event()
772       self._addThread(target=_AcquireShared, args=(ev, ))
773       acquires.append(ev)
774
775     # Wait for all acquires to finish
776     for i in acquires:
777       i.wait()
778
779     self.assertEqual(self.sl._count_pending(), 0)
780
781     # Try to get exclusive lock
782     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
783
784     # Acquire exclusive without timeout
785     exclsync = threading.Event()
786     exclev = threading.Event()
787
788     def _AcquireExclusive():
789       if not self.sl.acquire(shared=0):
790         return
791
792       self.done.put("exclusive")
793
794       # Notify main thread
795       exclev.set()
796
797       # Wait for notification from main thread
798       exclsync.wait()
799
800       self.sl.release()
801
802     self._addThread(target=_AcquireExclusive)
803
804     # Try to get exclusive lock
805     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
806
807     # Make all shared holders release their locks
808     sync.set()
809
810     # Wait for exclusive acquire to succeed
811     exclev.wait()
812
813     self.assertEqual(self.sl._count_pending(), 0)
814
815     # Try to get exclusive lock
816     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
817
818     def _AcquireSharedSimple():
819       if self.sl.acquire(shared=1, timeout=None):
820         self.done.put("shared2")
821         self.sl.release()
822
823     for _ in range(10):
824       self._addThread(target=_AcquireSharedSimple)
825
826     # Tell exclusive lock to release
827     exclsync.set()
828
829     # Wait for everything to finish
830     self._waitThreads()
831
832     self.assertEqual(self.sl._count_pending(), 0)
833
834     # Check sequence
835     for _ in range(3):
836       self.assertEqual(self.done.get_nowait(), "shared")
837
838     self.assertEqual(self.done.get_nowait(), "exclusive")
839
840     for _ in range(10):
841       self.assertEqual(self.done.get_nowait(), "shared2")
842
843     self.assertRaises(Queue.Empty, self.done.get_nowait)
844
845   def testPriority(self):
846     # Acquire in exclusive mode
847     self.assert_(self.sl.acquire(shared=0))
848
849     # Queue acquires
850     def _Acquire(prev, next, shared, priority, result):
851       prev.wait()
852       self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
853       try:
854         self.done.put(result)
855       finally:
856         self.sl.release()
857
858     counter = itertools.count(0)
859     priorities = range(-20, 30)
860     first = threading.Event()
861     prev = first
862
863     # Data structure:
864     # {
865     #   priority:
866     #     [(shared/exclusive, set(acquire names), set(pending threads)),
867     #      (shared/exclusive, ...),
868     #      ...,
869     #     ],
870     # }
871     perprio = {}
872
873     # References shared acquire per priority in L{perprio}. Data structure:
874     # {
875     #   priority: (shared=1, set(acquire names), set(pending threads)),
876     # }
877     prioshared = {}
878
879     for seed in [4979, 9523, 14902, 32440]:
880       # Use a deterministic random generator
881       rnd = random.Random(seed)
882       for priority in [rnd.choice(priorities) for _ in range(30)]:
883         modes = [0, 1]
884         rnd.shuffle(modes)
885         for shared in modes:
886           # Unique name
887           acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
888
889           ev = threading.Event()
890           thread = self._addThread(target=_Acquire,
891                                    args=(prev, ev, shared, priority, acqname))
892           prev = ev
893
894           # Record expected aqcuire, see above for structure
895           data = (shared, set([acqname]), set([thread]))
896           priolist = perprio.setdefault(priority, [])
897           if shared:
898             priosh = prioshared.get(priority, None)
899             if priosh:
900               # Shared acquires are merged
901               for i, j in zip(priosh[1:], data[1:]):
902                 i.update(j)
903               assert data[0] == priosh[0]
904             else:
905               prioshared[priority] = data
906               priolist.append(data)
907           else:
908             priolist.append(data)
909
910     # Start all acquires and wait for them
911     first.set()
912     prev.wait()
913
914     # Check lock information
915     self.assertEqual(self.sl.GetLockInfo(set()),
916                      [(self.sl.name, None, None, None)])
917     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
918                      [(self.sl.name, "exclusive",
919                        [threading.currentThread().getName()], None)])
920
921     self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
922                             perprio)
923
924     # Let threads acquire the lock
925     self.sl.release()
926
927     # Wait for everything to finish
928     self._waitThreads()
929
930     self.assert_(self.sl._check_empty())
931
932     # Check acquires by priority
933     for acquires in [perprio[i] for i in sorted(perprio.keys())]:
934       for (_, names, _) in acquires:
935         # For shared acquires, the set will contain 1..n entries. For exclusive
936         # acquires only one.
937         while names:
938           names.remove(self.done.get_nowait())
939       self.assertFalse(compat.any(names for (_, names, _) in acquires))
940
941     self.assertRaises(Queue.Empty, self.done.get_nowait)
942
943   def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
944     self.assertEqual(name, self.sl.name)
945     self.assert_(mode is None)
946     self.assert_(owner is None)
947
948     self.assertEqual([(pendmode, sorted(waiting))
949                       for (pendmode, waiting) in pending],
950                      [(["exclusive", "shared"][int(bool(shared))],
951                        sorted(t.getName() for t in threads))
952                       for acquires in [perprio[i]
953                                        for i in sorted(perprio.keys())]
954                       for (shared, _, threads) in acquires])
955
956   class _FakeTimeForSpuriousNotifications:
957     def __init__(self, now, check_end):
958       self.now = now
959       self.check_end = check_end
960
961       # Deterministic random number generator
962       self.rnd = random.Random(15086)
963
964     def time(self):
965       # Advance time if the random number generator thinks so (this is to test
966       # multiple notifications without advancing the time)
967       if self.rnd.random() < 0.3:
968         self.now += self.rnd.random()
969
970       self.check_end(self.now)
971
972       return self.now
973
974   @_Repeat
975   def testAcquireTimeoutWithSpuriousNotifications(self):
976     ready = threading.Event()
977     locked = threading.Event()
978     req = Queue.Queue(0)
979
980     epoch = 4000.0
981     timeout = 60.0
982
983     def check_end(now):
984       self.assertFalse(locked.isSet())
985
986       # If we waited long enough (in virtual time), tell main thread to release
987       # lock, otherwise tell it to notify once more
988       req.put(now < (epoch + (timeout * 0.8)))
989
990     time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
991
992     sl = locking.SharedLock("test", _time_fn=time_fn)
993
994     # Acquire in exclusive mode
995     sl.acquire(shared=0)
996
997     def fn():
998       self.assertTrue(sl.acquire(shared=0, timeout=timeout,
999                                  test_notify=ready.set))
1000       locked.set()
1001       sl.release()
1002       self.done.put("success")
1003
1004     # Start acquire with timeout and wait for it to be ready
1005     self._addThread(target=fn)
1006     ready.wait()
1007
1008     # The separate thread is now waiting to acquire the lock, so start sending
1009     # spurious notifications.
1010
1011     # Wait for separate thread to ask for another notification
1012     count = 0
1013     while req.get():
1014       # After sending the notification, the lock will take a short amount of
1015       # time to notice and to retrieve the current time
1016       sl._notify_topmost()
1017       count += 1
1018
1019     self.assertTrue(count > 100, "Not enough notifications were sent")
1020
1021     self.assertFalse(locked.isSet())
1022
1023     # Some notifications have been sent, now actually release the lock
1024     sl.release()
1025
1026     # Wait for lock to be acquired
1027     locked.wait()
1028
1029     self._waitThreads()
1030
1031     self.assertEqual(self.done.get_nowait(), "success")
1032     self.assertRaises(Queue.Empty, self.done.get_nowait)
1033
1034
1035 class TestSharedLockInCondition(_ThreadedTestCase):
1036   """SharedLock as a condition lock tests"""
1037
1038   def setUp(self):
1039     _ThreadedTestCase.setUp(self)
1040     self.sl = locking.SharedLock("TestSharedLockInCondition")
1041     self.setCondition()
1042
1043   def setCondition(self):
1044     self.cond = threading.Condition(self.sl)
1045
1046   def testKeepMode(self):
1047     self.cond.acquire(shared=1)
1048     self.assert_(self.sl.is_owned(shared=1))
1049     self.cond.wait(0)
1050     self.assert_(self.sl.is_owned(shared=1))
1051     self.cond.release()
1052     self.cond.acquire(shared=0)
1053     self.assert_(self.sl.is_owned(shared=0))
1054     self.cond.wait(0)
1055     self.assert_(self.sl.is_owned(shared=0))
1056     self.cond.release()
1057
1058
1059 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
1060   """SharedLock as a pipe condition lock tests"""
1061
1062   def setCondition(self):
1063     self.cond = locking.PipeCondition(self.sl)
1064
1065
1066 class TestSSynchronizedDecorator(_ThreadedTestCase):
1067   """Shared Lock Synchronized decorator test"""
1068
1069   def setUp(self):
1070     _ThreadedTestCase.setUp(self)
1071
1072   @locking.ssynchronized(_decoratorlock)
1073   def _doItExclusive(self):
1074     self.assert_(_decoratorlock.is_owned())
1075     self.done.put('EXC')
1076
1077   @locking.ssynchronized(_decoratorlock, shared=1)
1078   def _doItSharer(self):
1079     self.assert_(_decoratorlock.is_owned(shared=1))
1080     self.done.put('SHR')
1081
1082   def testDecoratedFunctions(self):
1083     self._doItExclusive()
1084     self.assertFalse(_decoratorlock.is_owned())
1085     self._doItSharer()
1086     self.assertFalse(_decoratorlock.is_owned())
1087
1088   def testSharersCanCoexist(self):
1089     _decoratorlock.acquire(shared=1)
1090     threading.Thread(target=self._doItSharer).start()
1091     self.assert_(self.done.get(True, 1))
1092     _decoratorlock.release()
1093
1094   @_Repeat
1095   def testExclusiveBlocksExclusive(self):
1096     _decoratorlock.acquire()
1097     self._addThread(target=self._doItExclusive)
1098     # give it a bit of time to check that it's not actually doing anything
1099     self.assertRaises(Queue.Empty, self.done.get_nowait)
1100     _decoratorlock.release()
1101     self._waitThreads()
1102     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1103
1104   @_Repeat
1105   def testExclusiveBlocksSharer(self):
1106     _decoratorlock.acquire()
1107     self._addThread(target=self._doItSharer)
1108     self.assertRaises(Queue.Empty, self.done.get_nowait)
1109     _decoratorlock.release()
1110     self._waitThreads()
1111     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1112
1113   @_Repeat
1114   def testSharerBlocksExclusive(self):
1115     _decoratorlock.acquire(shared=1)
1116     self._addThread(target=self._doItExclusive)
1117     self.assertRaises(Queue.Empty, self.done.get_nowait)
1118     _decoratorlock.release()
1119     self._waitThreads()
1120     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1121
1122
1123 class TestLockSet(_ThreadedTestCase):
1124   """LockSet tests"""
1125
1126   def setUp(self):
1127     _ThreadedTestCase.setUp(self)
1128     self._setUpLS()
1129
1130   def _setUpLS(self):
1131     """Helper to (re)initialize the lock set"""
1132     self.resources = ['one', 'two', 'three']
1133     self.ls = locking.LockSet(self.resources, "TestLockSet")
1134
1135   def testResources(self):
1136     self.assertEquals(self.ls._names(), set(self.resources))
1137     newls = locking.LockSet([], "TestLockSet.testResources")
1138     self.assertEquals(newls._names(), set())
1139
1140   def testCheckOwnedUnknown(self):
1141     self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1142     for shared in [-1, 0, 1, 6378, 24255]:
1143       self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1144                                            shared=shared))
1145
1146   def testCheckOwnedUnknownWhileHolding(self):
1147     self.assertFalse(self.ls.check_owned([]))
1148     self.ls.acquire("one", shared=1)
1149     self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1150     self.assertTrue(self.ls.check_owned("one", shared=1))
1151     self.assertFalse(self.ls.check_owned("one", shared=0))
1152     self.assertFalse(self.ls.check_owned(["one", "two"]))
1153     self.assertRaises(errors.LockError, self.ls.check_owned,
1154                       ["one", "nonexist"])
1155     self.assertRaises(errors.LockError, self.ls.check_owned, "")
1156     self.ls.release()
1157     self.assertFalse(self.ls.check_owned([]))
1158     self.assertFalse(self.ls.check_owned("one"))
1159
1160   def testAcquireRelease(self):
1161     self.assertFalse(self.ls.check_owned(self.ls._names()))
1162     self.assert_(self.ls.acquire('one'))
1163     self.assertEquals(self.ls.list_owned(), set(['one']))
1164     self.assertTrue(self.ls.check_owned("one"))
1165     self.assertTrue(self.ls.check_owned("one", shared=0))
1166     self.assertFalse(self.ls.check_owned("one", shared=1))
1167     self.ls.release()
1168     self.assertEquals(self.ls.list_owned(), set())
1169     self.assertFalse(self.ls.check_owned(self.ls._names()))
1170     self.assertEquals(self.ls.acquire(['one']), set(['one']))
1171     self.assertEquals(self.ls.list_owned(), set(['one']))
1172     self.ls.release()
1173     self.assertEquals(self.ls.list_owned(), set())
1174     self.ls.acquire(['one', 'two', 'three'])
1175     self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1176     self.assertTrue(self.ls.check_owned(self.ls._names()))
1177     self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1178     self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1179     self.ls.release('one')
1180     self.assertFalse(self.ls.check_owned(["one"]))
1181     self.assertTrue(self.ls.check_owned(["two", "three"]))
1182     self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1183     self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1184     self.assertEquals(self.ls.list_owned(), set(['two', 'three']))
1185     self.ls.release(['three'])
1186     self.assertEquals(self.ls.list_owned(), set(['two']))
1187     self.ls.release()
1188     self.assertEquals(self.ls.list_owned(), set())
1189     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1190     self.assertEquals(self.ls.list_owned(), set(['one', 'three']))
1191     self.ls.release()
1192     self.assertEquals(self.ls.list_owned(), set())
1193     for name in self.ls._names():
1194       self.assertFalse(self.ls.check_owned(name))
1195
1196   def testNoDoubleAcquire(self):
1197     self.ls.acquire('one')
1198     self.assertRaises(AssertionError, self.ls.acquire, 'one')
1199     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1200     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1201     self.ls.release()
1202     self.ls.acquire(['one', 'three'])
1203     self.ls.release('one')
1204     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1205     self.ls.release('three')
1206
1207   def testNoWrongRelease(self):
1208     self.assertRaises(AssertionError, self.ls.release)
1209     self.ls.acquire('one')
1210     self.assertRaises(AssertionError, self.ls.release, 'two')
1211
1212   def testAddRemove(self):
1213     self.ls.add('four')
1214     self.assertEquals(self.ls.list_owned(), set())
1215     self.assert_('four' in self.ls._names())
1216     self.ls.add(['five', 'six', 'seven'], acquired=1)
1217     self.assert_('five' in self.ls._names())
1218     self.assert_('six' in self.ls._names())
1219     self.assert_('seven' in self.ls._names())
1220     self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven']))
1221     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1222     self.assert_('five' not in self.ls._names())
1223     self.assert_('six' not in self.ls._names())
1224     self.assertEquals(self.ls.list_owned(), set(['seven']))
1225     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1226     self.ls.remove('seven')
1227     self.assert_('seven' not in self.ls._names())
1228     self.assertEquals(self.ls.list_owned(), set([]))
1229     self.ls.acquire(None, shared=1)
1230     self.assertRaises(AssertionError, self.ls.add, 'eight')
1231     self.ls.release()
1232     self.ls.acquire(None)
1233     self.ls.add('eight', acquired=1)
1234     self.assert_('eight' in self.ls._names())
1235     self.assert_('eight' in self.ls.list_owned())
1236     self.ls.add('nine')
1237     self.assert_('nine' in self.ls._names())
1238     self.assert_('nine' not in self.ls.list_owned())
1239     self.ls.release()
1240     self.ls.remove(['two'])
1241     self.assert_('two' not in self.ls._names())
1242     self.ls.acquire('three')
1243     self.assertEquals(self.ls.remove(['three']), ['three'])
1244     self.assert_('three' not in self.ls._names())
1245     self.assertEquals(self.ls.remove('three'), [])
1246     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1247     self.assert_('one' not in self.ls._names())
1248
1249   def testRemoveNonBlocking(self):
1250     self.ls.acquire('one')
1251     self.assertEquals(self.ls.remove('one'), ['one'])
1252     self.ls.acquire(['two', 'three'])
1253     self.assertEquals(self.ls.remove(['two', 'three']),
1254                       ['two', 'three'])
1255
1256   def testNoDoubleAdd(self):
1257     self.assertRaises(errors.LockError, self.ls.add, 'two')
1258     self.ls.add('four')
1259     self.assertRaises(errors.LockError, self.ls.add, 'four')
1260
1261   def testNoWrongRemoves(self):
1262     self.ls.acquire(['one', 'three'], shared=1)
1263     # Cannot remove 'two' while holding something which is not a superset
1264     self.assertRaises(AssertionError, self.ls.remove, 'two')
1265     # Cannot remove 'three' as we are sharing it
1266     self.assertRaises(AssertionError, self.ls.remove, 'three')
1267
1268   def testAcquireSetLock(self):
1269     # acquire the set-lock exclusively
1270     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1271     self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1272     self.assertEquals(self.ls.is_owned(), True)
1273     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1274     # I can still add/remove elements...
1275     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1276     self.assert_(self.ls.add('six'))
1277     self.ls.release()
1278     # share the set-lock
1279     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1280     # adding new elements is not possible
1281     self.assertRaises(AssertionError, self.ls.add, 'five')
1282     self.ls.release()
1283
1284   def testAcquireWithRepetitions(self):
1285     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1286                       set(['two', 'two', 'three']))
1287     self.ls.release(['two', 'two'])
1288     self.assertEquals(self.ls.list_owned(), set(['three']))
1289
1290   def testEmptyAcquire(self):
1291     # Acquire an empty list of locks...
1292     self.assertEquals(self.ls.acquire([]), set())
1293     self.assertEquals(self.ls.list_owned(), set())
1294     # New locks can still be addded
1295     self.assert_(self.ls.add('six'))
1296     # "re-acquiring" is not an issue, since we had really acquired nothing
1297     self.assertEquals(self.ls.acquire([], shared=1), set())
1298     self.assertEquals(self.ls.list_owned(), set())
1299     # We haven't really acquired anything, so we cannot release
1300     self.assertRaises(AssertionError, self.ls.release)
1301
1302   def _doLockSet(self, names, shared):
1303     try:
1304       self.ls.acquire(names, shared=shared)
1305       self.done.put('DONE')
1306       self.ls.release()
1307     except errors.LockError:
1308       self.done.put('ERR')
1309
1310   def _doAddSet(self, names):
1311     try:
1312       self.ls.add(names, acquired=1)
1313       self.done.put('DONE')
1314       self.ls.release()
1315     except errors.LockError:
1316       self.done.put('ERR')
1317
1318   def _doRemoveSet(self, names):
1319     self.done.put(self.ls.remove(names))
1320
1321   @_Repeat
1322   def testConcurrentSharedAcquire(self):
1323     self.ls.acquire(['one', 'two'], shared=1)
1324     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1325     self._waitThreads()
1326     self.assertEqual(self.done.get_nowait(), 'DONE')
1327     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1328     self._waitThreads()
1329     self.assertEqual(self.done.get_nowait(), 'DONE')
1330     self._addThread(target=self._doLockSet, args=('three', 1))
1331     self._waitThreads()
1332     self.assertEqual(self.done.get_nowait(), 'DONE')
1333     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1334     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1335     self.assertRaises(Queue.Empty, self.done.get_nowait)
1336     self.ls.release()
1337     self._waitThreads()
1338     self.assertEqual(self.done.get_nowait(), 'DONE')
1339     self.assertEqual(self.done.get_nowait(), 'DONE')
1340
1341   @_Repeat
1342   def testConcurrentExclusiveAcquire(self):
1343     self.ls.acquire(['one', 'two'])
1344     self._addThread(target=self._doLockSet, args=('three', 1))
1345     self._waitThreads()
1346     self.assertEqual(self.done.get_nowait(), 'DONE')
1347     self._addThread(target=self._doLockSet, args=('three', 0))
1348     self._waitThreads()
1349     self.assertEqual(self.done.get_nowait(), 'DONE')
1350     self.assertRaises(Queue.Empty, self.done.get_nowait)
1351     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1352     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1353     self._addThread(target=self._doLockSet, args=('one', 0))
1354     self._addThread(target=self._doLockSet, args=('one', 1))
1355     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1356     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1357     self.assertRaises(Queue.Empty, self.done.get_nowait)
1358     self.ls.release()
1359     self._waitThreads()
1360     for _ in range(6):
1361       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1362
1363   @_Repeat
1364   def testSimpleAcquireTimeoutExpiring(self):
1365     names = sorted(self.ls._names())
1366     self.assert_(len(names) >= 3)
1367
1368     # Get name of first lock
1369     first = names[0]
1370
1371     # Get name of last lock
1372     last = names.pop()
1373
1374     checks = [
1375       # Block first and try to lock it again
1376       (first, first),
1377
1378       # Block last and try to lock all locks
1379       (None, first),
1380
1381       # Block last and try to lock it again
1382       (last, last),
1383       ]
1384
1385     for (wanted, block) in checks:
1386       # Lock in exclusive mode
1387       self.assert_(self.ls.acquire(block, shared=0))
1388
1389       def _AcquireOne():
1390         # Try to get the same lock again with a timeout (should never succeed)
1391         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1392         if acquired:
1393           self.done.put("acquired")
1394           self.ls.release()
1395         else:
1396           self.assert_(acquired is None)
1397           self.assertFalse(self.ls.list_owned())
1398           self.assertFalse(self.ls.is_owned())
1399           self.done.put("not acquired")
1400
1401       self._addThread(target=_AcquireOne)
1402
1403       # Wait for timeout in thread to expire
1404       self._waitThreads()
1405
1406       # Release exclusive lock again
1407       self.ls.release()
1408
1409       self.assertEqual(self.done.get_nowait(), "not acquired")
1410       self.assertRaises(Queue.Empty, self.done.get_nowait)
1411
1412   @_Repeat
1413   def testDelayedAndExpiringLockAcquire(self):
1414     self._setUpLS()
1415     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1416
1417     for expire in (False, True):
1418       names = sorted(self.ls._names())
1419       self.assertEqual(len(names), 8)
1420
1421       lock_ev = dict([(i, threading.Event()) for i in names])
1422
1423       # Lock all in exclusive mode
1424       self.assert_(self.ls.acquire(names, shared=0))
1425
1426       if expire:
1427         # We'll wait at least 300ms per lock
1428         lockwait = len(names) * [0.3]
1429
1430         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1431         # this gives us up to 2.4s to fail.
1432         lockall_timeout = 0.4
1433       else:
1434         # This should finish rather quickly
1435         lockwait = None
1436         lockall_timeout = len(names) * 5.0
1437
1438       def _LockAll():
1439         def acquire_notification(name):
1440           if not expire:
1441             self.done.put("getting %s" % name)
1442
1443           # Kick next lock
1444           lock_ev[name].set()
1445
1446         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1447                            test_notify=acquire_notification):
1448           self.done.put("got all")
1449           self.ls.release()
1450         else:
1451           self.done.put("timeout on all")
1452
1453         # Notify all locks
1454         for ev in lock_ev.values():
1455           ev.set()
1456
1457       t = self._addThread(target=_LockAll)
1458
1459       for idx, name in enumerate(names):
1460         # Wait for actual acquire on this lock to start
1461         lock_ev[name].wait(10.0)
1462
1463         if expire and t.isAlive():
1464           # Wait some time after getting the notification to make sure the lock
1465           # acquire will expire
1466           SafeSleep(lockwait[idx])
1467
1468         self.ls.release(names=name)
1469
1470       self.assertFalse(self.ls.list_owned())
1471
1472       self._waitThreads()
1473
1474       if expire:
1475         # Not checking which locks were actually acquired. Doing so would be
1476         # too timing-dependant.
1477         self.assertEqual(self.done.get_nowait(), "timeout on all")
1478       else:
1479         for i in names:
1480           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1481         self.assertEqual(self.done.get_nowait(), "got all")
1482       self.assertRaises(Queue.Empty, self.done.get_nowait)
1483
1484   @_Repeat
1485   def testConcurrentRemove(self):
1486     self.ls.add('four')
1487     self.ls.acquire(['one', 'two', 'four'])
1488     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1489     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1490     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1491     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1492     self.assertRaises(Queue.Empty, self.done.get_nowait)
1493     self.ls.remove('one')
1494     self.ls.release()
1495     self._waitThreads()
1496     for i in range(4):
1497       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1498     self.ls.add(['five', 'six'], acquired=1)
1499     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1500     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1501     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1502     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1503     self.ls.remove('five')
1504     self.ls.release()
1505     self._waitThreads()
1506     for i in range(4):
1507       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1508     self.ls.acquire(['three', 'four'])
1509     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1510     self.assertRaises(Queue.Empty, self.done.get_nowait)
1511     self.ls.remove('four')
1512     self._waitThreads()
1513     self.assertEqual(self.done.get_nowait(), ['six'])
1514     self._addThread(target=self._doRemoveSet, args=(['two']))
1515     self._waitThreads()
1516     self.assertEqual(self.done.get_nowait(), ['two'])
1517     self.ls.release()
1518     # reset lockset
1519     self._setUpLS()
1520
1521   @_Repeat
1522   def testConcurrentSharedSetLock(self):
1523     # share the set-lock...
1524     self.ls.acquire(None, shared=1)
1525     # ...another thread can share it too
1526     self._addThread(target=self._doLockSet, args=(None, 1))
1527     self._waitThreads()
1528     self.assertEqual(self.done.get_nowait(), 'DONE')
1529     # ...or just share some elements
1530     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1531     self._waitThreads()
1532     self.assertEqual(self.done.get_nowait(), 'DONE')
1533     # ...but not add new ones or remove any
1534     t = self._addThread(target=self._doAddSet, args=(['nine']))
1535     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1536     self.assertRaises(Queue.Empty, self.done.get_nowait)
1537     # this just releases the set-lock
1538     self.ls.release([])
1539     t.join(60)
1540     self.assertEqual(self.done.get_nowait(), 'DONE')
1541     # release the lock on the actual elements so remove() can proceed too
1542     self.ls.release()
1543     self._waitThreads()
1544     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1545     # reset lockset
1546     self._setUpLS()
1547
1548   @_Repeat
1549   def testConcurrentExclusiveSetLock(self):
1550     # acquire the set-lock...
1551     self.ls.acquire(None, shared=0)
1552     # ...no one can do anything else
1553     self._addThread(target=self._doLockSet, args=(None, 1))
1554     self._addThread(target=self._doLockSet, args=(None, 0))
1555     self._addThread(target=self._doLockSet, args=(['three'], 0))
1556     self._addThread(target=self._doLockSet, args=(['two'], 1))
1557     self._addThread(target=self._doAddSet, args=(['nine']))
1558     self.assertRaises(Queue.Empty, self.done.get_nowait)
1559     self.ls.release()
1560     self._waitThreads()
1561     for _ in range(5):
1562       self.assertEqual(self.done.get(True, 1), 'DONE')
1563     # cleanup
1564     self._setUpLS()
1565
1566   @_Repeat
1567   def testConcurrentSetLockAdd(self):
1568     self.ls.acquire('one')
1569     # Another thread wants the whole SetLock
1570     self._addThread(target=self._doLockSet, args=(None, 0))
1571     self._addThread(target=self._doLockSet, args=(None, 1))
1572     self.assertRaises(Queue.Empty, self.done.get_nowait)
1573     self.assertRaises(AssertionError, self.ls.add, 'four')
1574     self.ls.release()
1575     self._waitThreads()
1576     self.assertEqual(self.done.get_nowait(), 'DONE')
1577     self.assertEqual(self.done.get_nowait(), 'DONE')
1578     self.ls.acquire(None)
1579     self._addThread(target=self._doLockSet, args=(None, 0))
1580     self._addThread(target=self._doLockSet, args=(None, 1))
1581     self.assertRaises(Queue.Empty, self.done.get_nowait)
1582     self.ls.add('four')
1583     self.ls.add('five', acquired=1)
1584     self.ls.add('six', acquired=1, shared=1)
1585     self.assertEquals(self.ls.list_owned(),
1586       set(['one', 'two', 'three', 'five', 'six']))
1587     self.assertEquals(self.ls.is_owned(), True)
1588     self.assertEquals(self.ls._names(),
1589       set(['one', 'two', 'three', 'four', 'five', 'six']))
1590     self.ls.release()
1591     self._waitThreads()
1592     self.assertEqual(self.done.get_nowait(), 'DONE')
1593     self.assertEqual(self.done.get_nowait(), 'DONE')
1594     self._setUpLS()
1595
1596   @_Repeat
1597   def testEmptyLockSet(self):
1598     # get the set-lock
1599     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1600     # now empty it...
1601     self.ls.remove(['one', 'two', 'three'])
1602     # and adds/locks by another thread still wait
1603     self._addThread(target=self._doAddSet, args=(['nine']))
1604     self._addThread(target=self._doLockSet, args=(None, 1))
1605     self._addThread(target=self._doLockSet, args=(None, 0))
1606     self.assertRaises(Queue.Empty, self.done.get_nowait)
1607     self.ls.release()
1608     self._waitThreads()
1609     for _ in range(3):
1610       self.assertEqual(self.done.get_nowait(), 'DONE')
1611     # empty it again...
1612     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1613     # now share it...
1614     self.assertEqual(self.ls.acquire(None, shared=1), set())
1615     # other sharers can go, adds still wait
1616     self._addThread(target=self._doLockSet, args=(None, 1))
1617     self._waitThreads()
1618     self.assertEqual(self.done.get_nowait(), 'DONE')
1619     self._addThread(target=self._doAddSet, args=(['nine']))
1620     self.assertRaises(Queue.Empty, self.done.get_nowait)
1621     self.ls.release()
1622     self._waitThreads()
1623     self.assertEqual(self.done.get_nowait(), 'DONE')
1624     self._setUpLS()
1625
1626   def testAcquireWithNamesDowngrade(self):
1627     self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1628     self.assertTrue(self.ls.is_owned())
1629     self.assertFalse(self.ls._get_lock().is_owned())
1630     self.ls.release()
1631     self.assertFalse(self.ls.is_owned())
1632     self.assertFalse(self.ls._get_lock().is_owned())
1633     # Can't downgrade after releasing
1634     self.assertRaises(AssertionError, self.ls.downgrade, "two")
1635
1636   def testDowngrade(self):
1637     # Not owning anything, must raise an exception
1638     self.assertFalse(self.ls.is_owned())
1639     self.assertRaises(AssertionError, self.ls.downgrade)
1640
1641     self.assertFalse(compat.any(i.is_owned()
1642                                 for i in self.ls._get_lockdict().values()))
1643     self.assertFalse(self.ls.check_owned(self.ls._names()))
1644     for name in self.ls._names():
1645       self.assertFalse(self.ls.check_owned(name))
1646
1647     self.assertEquals(self.ls.acquire(None, shared=0),
1648                       set(["one", "two", "three"]))
1649     self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1650
1651     self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1652     for name in self.ls._names():
1653       self.assertTrue(self.ls.check_owned(name))
1654       self.assertTrue(self.ls.check_owned(name, shared=0))
1655       self.assertFalse(self.ls.check_owned(name, shared=1))
1656
1657     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1658     self.assertTrue(compat.all(i.is_owned(shared=0)
1659                                for i in self.ls._get_lockdict().values()))
1660
1661     # Start downgrading locks
1662     self.assertTrue(self.ls.downgrade(names=["one"]))
1663     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1664     self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1665                                for name, lock in
1666                                  self.ls._get_lockdict().items()))
1667
1668     self.assertFalse(self.ls.check_owned("one", shared=0))
1669     self.assertTrue(self.ls.check_owned("one", shared=1))
1670     self.assertTrue(self.ls.check_owned("two", shared=0))
1671     self.assertTrue(self.ls.check_owned("three", shared=0))
1672
1673     # Downgrade second lock
1674     self.assertTrue(self.ls.downgrade(names="two"))
1675     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1676     should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1677     self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1678                                for name, lock in
1679                                  self.ls._get_lockdict().items()))
1680
1681     self.assertFalse(self.ls.check_owned("one", shared=0))
1682     self.assertTrue(self.ls.check_owned("one", shared=1))
1683     self.assertFalse(self.ls.check_owned("two", shared=0))
1684     self.assertTrue(self.ls.check_owned("two", shared=1))
1685     self.assertTrue(self.ls.check_owned("three", shared=0))
1686
1687     # Downgrading the last exclusive lock to shared must downgrade the
1688     # lockset-internal lock too
1689     self.assertTrue(self.ls.downgrade(names="three"))
1690     self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1691     self.assertTrue(compat.all(i.is_owned(shared=1)
1692                                for i in self.ls._get_lockdict().values()))
1693
1694     # Verify owned locks
1695     for name in self.ls._names():
1696       self.assertTrue(self.ls.check_owned(name, shared=1))
1697
1698     # Downgrading a shared lock must be a no-op
1699     self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1700     self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1701     self.assertTrue(compat.all(i.is_owned(shared=1)
1702                                for i in self.ls._get_lockdict().values()))
1703
1704     self.ls.release()
1705
1706   def testPriority(self):
1707     def _Acquire(prev, next, name, priority, success_fn):
1708       prev.wait()
1709       self.assert_(self.ls.acquire(name, shared=0,
1710                                    priority=priority,
1711                                    test_notify=lambda _: next.set()))
1712       try:
1713         success_fn()
1714       finally:
1715         self.ls.release()
1716
1717     # Get all in exclusive mode
1718     self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1719
1720     done_two = Queue.Queue(0)
1721
1722     first = threading.Event()
1723     prev = first
1724
1725     acquires = [("one", prio, self.done) for prio in range(1, 33)]
1726     acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1727
1728     # Use a deterministic random generator
1729     random.Random(741).shuffle(acquires)
1730
1731     for (name, prio, done) in acquires:
1732       ev = threading.Event()
1733       self._addThread(target=_Acquire,
1734                       args=(prev, ev, name, prio,
1735                             compat.partial(done.put, "Prio%s" % prio)))
1736       prev = ev
1737
1738     # Start acquires
1739     first.set()
1740
1741     # Wait for last acquire to start
1742     prev.wait()
1743
1744     # Let threads acquire locks
1745     self.ls.release()
1746
1747     # Wait for threads to finish
1748     self._waitThreads()
1749
1750     for i in range(1, 33):
1751       self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1752       self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1753
1754     self.assertRaises(Queue.Empty, self.done.get_nowait)
1755     self.assertRaises(Queue.Empty, done_two.get_nowait)
1756
1757
1758 class TestGanetiLockManager(_ThreadedTestCase):
1759
1760   def setUp(self):
1761     _ThreadedTestCase.setUp(self)
1762     self.nodes=['n1', 'n2']
1763     self.nodegroups=['g1', 'g2']
1764     self.instances=['i1', 'i2', 'i3']
1765     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1766                                         self.instances)
1767
1768   def tearDown(self):
1769     # Don't try this at home...
1770     locking.GanetiLockManager._instance = None
1771
1772   def testLockingConstants(self):
1773     # The locking library internally cheats by assuming its constants have some
1774     # relationships with each other. Check those hold true.
1775     # This relationship is also used in the Processor to recursively acquire
1776     # the right locks. Again, please don't break it.
1777     for i in range(len(locking.LEVELS)):
1778       self.assertEqual(i, locking.LEVELS[i])
1779
1780   def testDoubleGLFails(self):
1781     self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1782
1783   def testLockNames(self):
1784     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1785     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1786     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1787                      set(self.nodegroups))
1788     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1789                      set(self.instances))
1790
1791   def testInitAndResources(self):
1792     locking.GanetiLockManager._instance = None
1793     self.GL = locking.GanetiLockManager([], [], [])
1794     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1795     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1796     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1797     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1798
1799     locking.GanetiLockManager._instance = None
1800     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1801     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1802     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1803     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1804                                     set(self.nodegroups))
1805     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1806
1807     locking.GanetiLockManager._instance = None
1808     self.GL = locking.GanetiLockManager([], [], self.instances)
1809     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1810     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1811     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1812     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1813                      set(self.instances))
1814
1815   def testAcquireRelease(self):
1816     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1817     self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1818     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1819     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1820     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1821     self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
1822                                         shared=1))
1823     self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
1824     self.GL.release(locking.LEVEL_NODE, ['n2'])
1825     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1']))
1826     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1827     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1828     self.GL.release(locking.LEVEL_NODE)
1829     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
1830     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1831     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1832     self.GL.release(locking.LEVEL_NODEGROUP)
1833     self.GL.release(locking.LEVEL_INSTANCE)
1834     self.assertRaises(errors.LockError, self.GL.acquire,
1835                       locking.LEVEL_INSTANCE, ['i5'])
1836     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1837     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1838
1839   def testAcquireWholeSets(self):
1840     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1841     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1842                       set(self.instances))
1843     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1844                       set(self.instances))
1845     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1846                       set(self.nodegroups))
1847     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
1848                       set(self.nodegroups))
1849     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1850                       set(self.nodes))
1851     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1852                       set(self.nodes))
1853     self.GL.release(locking.LEVEL_NODE)
1854     self.GL.release(locking.LEVEL_NODEGROUP)
1855     self.GL.release(locking.LEVEL_INSTANCE)
1856     self.GL.release(locking.LEVEL_CLUSTER)
1857
1858   def testAcquireWholeAndPartial(self):
1859     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1860     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1861                       set(self.instances))
1862     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1863                       set(self.instances))
1864     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1865                       set(['n2']))
1866     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1867                       set(['n2']))
1868     self.GL.release(locking.LEVEL_NODE)
1869     self.GL.release(locking.LEVEL_INSTANCE)
1870     self.GL.release(locking.LEVEL_CLUSTER)
1871
1872   def testBGLDependency(self):
1873     self.assertRaises(AssertionError, self.GL.acquire,
1874                       locking.LEVEL_NODE, ['n1', 'n2'])
1875     self.assertRaises(AssertionError, self.GL.acquire,
1876                       locking.LEVEL_INSTANCE, ['i3'])
1877     self.assertRaises(AssertionError, self.GL.acquire,
1878                       locking.LEVEL_NODEGROUP, ['g1'])
1879     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1880     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1881     self.assertRaises(AssertionError, self.GL.release,
1882                       locking.LEVEL_CLUSTER, ['BGL'])
1883     self.assertRaises(AssertionError, self.GL.release,
1884                       locking.LEVEL_CLUSTER)
1885     self.GL.release(locking.LEVEL_NODE)
1886     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1887     self.assertRaises(AssertionError, self.GL.release,
1888                       locking.LEVEL_CLUSTER, ['BGL'])
1889     self.assertRaises(AssertionError, self.GL.release,
1890                       locking.LEVEL_CLUSTER)
1891     self.GL.release(locking.LEVEL_INSTANCE)
1892     self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1893     self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1894     self.assertRaises(AssertionError, self.GL.release,
1895                       locking.LEVEL_CLUSTER, ['BGL'])
1896     self.assertRaises(AssertionError, self.GL.release,
1897                       locking.LEVEL_CLUSTER)
1898     self.GL.release(locking.LEVEL_NODEGROUP)
1899     self.GL.release(locking.LEVEL_CLUSTER)
1900
1901   def testWrongOrder(self):
1902     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1903     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1904     self.assertRaises(AssertionError, self.GL.acquire,
1905                       locking.LEVEL_NODE, ['n1'])
1906     self.assertRaises(AssertionError, self.GL.acquire,
1907                       locking.LEVEL_NODEGROUP, ['g1'])
1908     self.assertRaises(AssertionError, self.GL.acquire,
1909                       locking.LEVEL_INSTANCE, ['i2'])
1910
1911   def testModifiableLevels(self):
1912     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1913                       ['BGL2'])
1914     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1915     self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1916     self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1917     self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1918     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1919     self.GL.add(locking.LEVEL_NODE, ['n3'])
1920     self.GL.remove(locking.LEVEL_NODE, ['n1'])
1921     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1922     self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1923     self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1924     self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1925     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1926     self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1927                       ['BGL2'])
1928
1929   # Helper function to run as a thread that shared the BGL and then acquires
1930   # some locks at another level.
1931   def _doLock(self, level, names, shared):
1932     try:
1933       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1934       self.GL.acquire(level, names, shared=shared)
1935       self.done.put('DONE')
1936       self.GL.release(level)
1937       self.GL.release(locking.LEVEL_CLUSTER)
1938     except errors.LockError:
1939       self.done.put('ERR')
1940
1941   @_Repeat
1942   def testConcurrency(self):
1943     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1944     self._addThread(target=self._doLock,
1945                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1946     self._waitThreads()
1947     self.assertEqual(self.done.get_nowait(), 'DONE')
1948     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1949     self._addThread(target=self._doLock,
1950                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1951     self._waitThreads()
1952     self.assertEqual(self.done.get_nowait(), 'DONE')
1953     self._addThread(target=self._doLock,
1954                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1955     self.assertRaises(Queue.Empty, self.done.get_nowait)
1956     self.GL.release(locking.LEVEL_INSTANCE)
1957     self._waitThreads()
1958     self.assertEqual(self.done.get_nowait(), 'DONE')
1959     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1960     self._addThread(target=self._doLock,
1961                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1962     self._waitThreads()
1963     self.assertEqual(self.done.get_nowait(), 'DONE')
1964     self._addThread(target=self._doLock,
1965                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1966     self.assertRaises(Queue.Empty, self.done.get_nowait)
1967     self.GL.release(locking.LEVEL_INSTANCE)
1968     self._waitThreads()
1969     self.assertEqual(self.done.get(True, 1), 'DONE')
1970     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1971
1972
1973 class TestLockMonitor(_ThreadedTestCase):
1974   def setUp(self):
1975     _ThreadedTestCase.setUp(self)
1976     self.lm = locking.LockMonitor()
1977
1978   def testSingleThread(self):
1979     locks = []
1980
1981     for i in range(100):
1982       name = "TestLock%s" % i
1983       locks.append(locking.SharedLock(name, monitor=self.lm))
1984
1985     self.assertEqual(len(self.lm._locks), len(locks))
1986     result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1987     self.assertEqual(len(result.fields), 1)
1988     self.assertEqual(len(result.data), 100)
1989
1990     # Delete all locks
1991     del locks[:]
1992
1993     # The garbage collector might needs some time
1994     def _CheckLocks():
1995       if self.lm._locks:
1996         raise utils.RetryAgain()
1997
1998     utils.Retry(_CheckLocks, 0.1, 30.0)
1999
2000     self.assertFalse(self.lm._locks)
2001
2002   def testMultiThread(self):
2003     locks = []
2004
2005     def _CreateLock(prev, next, name):
2006       prev.wait()
2007       locks.append(locking.SharedLock(name, monitor=self.lm))
2008       if next:
2009         next.set()
2010
2011     expnames = []
2012
2013     first = threading.Event()
2014     prev = first
2015
2016     # Use a deterministic random generator
2017     for i in random.Random(4263).sample(range(100), 33):
2018       name = "MtTestLock%s" % i
2019       expnames.append(name)
2020
2021       ev = threading.Event()
2022       self._addThread(target=_CreateLock, args=(prev, ev, name))
2023       prev = ev
2024
2025     # Add locks
2026     first.set()
2027     self._waitThreads()
2028
2029     # Check order in which locks were added
2030     self.assertEqual([i.name for i in locks], expnames)
2031
2032     # Check query result
2033     result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2034     self.assert_(isinstance(result, dict))
2035     response = objects.QueryResponse.FromDict(result)
2036     self.assertEqual(response.data,
2037                      [[(constants.RS_NORMAL, name),
2038                        (constants.RS_NORMAL, None),
2039                        (constants.RS_NORMAL, None),
2040                        (constants.RS_NORMAL, [])]
2041                       for name in utils.NiceSort(expnames)])
2042     self.assertEqual(len(response.fields), 4)
2043     self.assertEqual(["name", "mode", "owner", "pending"],
2044                      [fdef.name for fdef in response.fields])
2045
2046     # Test exclusive acquire
2047     for tlock in locks[::4]:
2048       tlock.acquire(shared=0)
2049       try:
2050         def _GetExpResult(name):
2051           if tlock.name == name:
2052             return [(constants.RS_NORMAL, name),
2053                     (constants.RS_NORMAL, "exclusive"),
2054                     (constants.RS_NORMAL,
2055                      [threading.currentThread().getName()]),
2056                     (constants.RS_NORMAL, [])]
2057           return [(constants.RS_NORMAL, name),
2058                   (constants.RS_NORMAL, None),
2059                   (constants.RS_NORMAL, None),
2060                   (constants.RS_NORMAL, [])]
2061
2062         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2063         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2064                          [_GetExpResult(name)
2065                           for name in utils.NiceSort(expnames)])
2066       finally:
2067         tlock.release()
2068
2069     # Test shared acquire
2070     def _Acquire(lock, shared, ev, notify):
2071       lock.acquire(shared=shared)
2072       try:
2073         notify.set()
2074         ev.wait()
2075       finally:
2076         lock.release()
2077
2078     for tlock1 in locks[::11]:
2079       for tlock2 in locks[::-15]:
2080         if tlock2 == tlock1:
2081           # Avoid deadlocks
2082           continue
2083
2084         for tlock3 in locks[::10]:
2085           if tlock3 in (tlock2, tlock1):
2086             # Avoid deadlocks
2087             continue
2088
2089           releaseev = threading.Event()
2090
2091           # Acquire locks
2092           acquireev = []
2093           tthreads1 = []
2094           for i in range(3):
2095             ev = threading.Event()
2096             tthreads1.append(self._addThread(target=_Acquire,
2097                                              args=(tlock1, 1, releaseev, ev)))
2098             acquireev.append(ev)
2099
2100           ev = threading.Event()
2101           tthread2 = self._addThread(target=_Acquire,
2102                                      args=(tlock2, 1, releaseev, ev))
2103           acquireev.append(ev)
2104
2105           ev = threading.Event()
2106           tthread3 = self._addThread(target=_Acquire,
2107                                      args=(tlock3, 0, releaseev, ev))
2108           acquireev.append(ev)
2109
2110           # Wait for all locks to be acquired
2111           for i in acquireev:
2112             i.wait()
2113
2114           # Check query result
2115           result = self.lm.QueryLocks(["name", "mode", "owner"])
2116           response = objects.QueryResponse.FromDict(result)
2117           for (name, mode, owner) in response.data:
2118             (name_status, name_value) = name
2119             (owner_status, owner_value) = owner
2120
2121             self.assertEqual(name_status, constants.RS_NORMAL)
2122             self.assertEqual(owner_status, constants.RS_NORMAL)
2123
2124             if name_value == tlock1.name:
2125               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2126               self.assertEqual(set(owner_value),
2127                                set(i.getName() for i in tthreads1))
2128               continue
2129
2130             if name_value == tlock2.name:
2131               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2132               self.assertEqual(owner_value, [tthread2.getName()])
2133               continue
2134
2135             if name_value == tlock3.name:
2136               self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2137               self.assertEqual(owner_value, [tthread3.getName()])
2138               continue
2139
2140             self.assert_(name_value in expnames)
2141             self.assertEqual(mode, (constants.RS_NORMAL, None))
2142             self.assert_(owner_value is None)
2143
2144           # Release locks again
2145           releaseev.set()
2146
2147           self._waitThreads()
2148
2149           result = self.lm.QueryLocks(["name", "mode", "owner"])
2150           self.assertEqual(objects.QueryResponse.FromDict(result).data,
2151                            [[(constants.RS_NORMAL, name),
2152                              (constants.RS_NORMAL, None),
2153                              (constants.RS_NORMAL, None)]
2154                             for name in utils.NiceSort(expnames)])
2155
2156   def testDelete(self):
2157     lock = locking.SharedLock("TestLock", monitor=self.lm)
2158
2159     self.assertEqual(len(self.lm._locks), 1)
2160     result = self.lm.QueryLocks(["name", "mode", "owner"])
2161     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2162                      [[(constants.RS_NORMAL, lock.name),
2163                        (constants.RS_NORMAL, None),
2164                        (constants.RS_NORMAL, None)]])
2165
2166     lock.delete()
2167
2168     result = self.lm.QueryLocks(["name", "mode", "owner"])
2169     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2170                      [[(constants.RS_NORMAL, lock.name),
2171                        (constants.RS_NORMAL, "deleted"),
2172                        (constants.RS_NORMAL, None)]])
2173     self.assertEqual(len(self.lm._locks), 1)
2174
2175   def testPending(self):
2176     def _Acquire(lock, shared, prev, next):
2177       prev.wait()
2178
2179       lock.acquire(shared=shared, test_notify=next.set)
2180       try:
2181         pass
2182       finally:
2183         lock.release()
2184
2185     lock = locking.SharedLock("ExcLock", monitor=self.lm)
2186
2187     for shared in [0, 1]:
2188       lock.acquire()
2189       try:
2190         self.assertEqual(len(self.lm._locks), 1)
2191         result = self.lm.QueryLocks(["name", "mode", "owner"])
2192         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2193                          [[(constants.RS_NORMAL, lock.name),
2194                            (constants.RS_NORMAL, "exclusive"),
2195                            (constants.RS_NORMAL,
2196                             [threading.currentThread().getName()])]])
2197
2198         threads = []
2199
2200         first = threading.Event()
2201         prev = first
2202
2203         for i in range(5):
2204           ev = threading.Event()
2205           threads.append(self._addThread(target=_Acquire,
2206                                           args=(lock, shared, prev, ev)))
2207           prev = ev
2208
2209         # Start acquires
2210         first.set()
2211
2212         # Wait for last acquire to start waiting
2213         prev.wait()
2214
2215         # NOTE: This works only because QueryLocks will acquire the
2216         # lock-internal lock again and won't be able to get the information
2217         # until it has the lock. By then the acquire should be registered in
2218         # SharedLock.__pending (otherwise it's a bug).
2219
2220         # All acquires are waiting now
2221         if shared:
2222           pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2223         else:
2224           pending = [("exclusive", [t.getName()]) for t in threads]
2225
2226         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2227         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2228                          [[(constants.RS_NORMAL, lock.name),
2229                            (constants.RS_NORMAL, "exclusive"),
2230                            (constants.RS_NORMAL,
2231                             [threading.currentThread().getName()]),
2232                            (constants.RS_NORMAL, pending)]])
2233
2234         self.assertEqual(len(self.lm._locks), 1)
2235       finally:
2236         lock.release()
2237
2238       self._waitThreads()
2239
2240       # No pending acquires
2241       result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2242       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2243                        [[(constants.RS_NORMAL, lock.name),
2244                          (constants.RS_NORMAL, None),
2245                          (constants.RS_NORMAL, None),
2246                          (constants.RS_NORMAL, [])]])
2247
2248       self.assertEqual(len(self.lm._locks), 1)
2249
2250   def testDeleteAndRecreate(self):
2251     lname = "TestLock101923193"
2252
2253     # Create some locks with the same name and keep all references
2254     locks = [locking.SharedLock(lname, monitor=self.lm)
2255              for _ in range(5)]
2256
2257     self.assertEqual(len(self.lm._locks), len(locks))
2258
2259     result = self.lm.QueryLocks(["name", "mode", "owner"])
2260     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2261                      [[(constants.RS_NORMAL, lname),
2262                        (constants.RS_NORMAL, None),
2263                        (constants.RS_NORMAL, None)]] * 5)
2264
2265     locks[2].delete()
2266
2267     # Check information order
2268     result = self.lm.QueryLocks(["name", "mode", "owner"])
2269     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2270                      [[(constants.RS_NORMAL, lname),
2271                        (constants.RS_NORMAL, None),
2272                        (constants.RS_NORMAL, None)]] * 2 +
2273                      [[(constants.RS_NORMAL, lname),
2274                        (constants.RS_NORMAL, "deleted"),
2275                        (constants.RS_NORMAL, None)]] +
2276                      [[(constants.RS_NORMAL, lname),
2277                        (constants.RS_NORMAL, None),
2278                        (constants.RS_NORMAL, None)]] * 2)
2279
2280     locks[1].acquire(shared=0)
2281
2282     last_status = [
2283       [(constants.RS_NORMAL, lname),
2284        (constants.RS_NORMAL, None),
2285        (constants.RS_NORMAL, None)],
2286       [(constants.RS_NORMAL, lname),
2287        (constants.RS_NORMAL, "exclusive"),
2288        (constants.RS_NORMAL, [threading.currentThread().getName()])],
2289       [(constants.RS_NORMAL, lname),
2290        (constants.RS_NORMAL, "deleted"),
2291        (constants.RS_NORMAL, None)],
2292       [(constants.RS_NORMAL, lname),
2293        (constants.RS_NORMAL, None),
2294        (constants.RS_NORMAL, None)],
2295       [(constants.RS_NORMAL, lname),
2296        (constants.RS_NORMAL, None),
2297        (constants.RS_NORMAL, None)],
2298       ]
2299
2300     # Check information order
2301     result = self.lm.QueryLocks(["name", "mode", "owner"])
2302     self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2303
2304     self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2305     self.assertEqual(len(self.lm._locks), len(locks))
2306
2307     # Check lock deletion
2308     for idx in range(len(locks)):
2309       del locks[0]
2310       assert gc.isenabled()
2311       gc.collect()
2312       self.assertEqual(len(self.lm._locks), len(locks))
2313       result = self.lm.QueryLocks(["name", "mode", "owner"])
2314       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2315                        last_status[idx + 1:])
2316
2317     # All locks should have been deleted
2318     assert not locks
2319     self.assertFalse(self.lm._locks)
2320
2321     result = self.lm.QueryLocks(["name", "mode", "owner"])
2322     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2323
2324   class _FakeLock:
2325     def __init__(self):
2326       self._info = []
2327
2328     def AddResult(self, *args):
2329       self._info.append(args)
2330
2331     def CountPending(self):
2332       return len(self._info)
2333
2334     def GetLockInfo(self, requested):
2335       (exp_requested, result) = self._info.pop(0)
2336
2337       if exp_requested != requested:
2338         raise Exception("Requested information (%s) does not match"
2339                         " expectations (%s)" % (requested, exp_requested))
2340
2341       return result
2342
2343   def testMultipleResults(self):
2344     fl1 = self._FakeLock()
2345     fl2 = self._FakeLock()
2346
2347     self.lm.RegisterLock(fl1)
2348     self.lm.RegisterLock(fl2)
2349
2350     # Empty information
2351     for i in [fl1, fl2]:
2352       i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2353     result = self.lm.QueryLocks(["name", "mode", "owner"])
2354     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2355     for i in [fl1, fl2]:
2356       self.assertEqual(i.CountPending(), 0)
2357
2358     # Check ordering
2359     for fn in [lambda x: x, reversed, sorted]:
2360       fl1.AddResult(set(), list(fn([
2361         ("aaa", None, None, None),
2362         ("bbb", None, None, None),
2363         ])))
2364       fl2.AddResult(set(), [])
2365       result = self.lm.QueryLocks(["name"])
2366       self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2367         [(constants.RS_NORMAL, "aaa")],
2368         [(constants.RS_NORMAL, "bbb")],
2369         ])
2370       for i in [fl1, fl2]:
2371         self.assertEqual(i.CountPending(), 0)
2372
2373       for fn2 in [lambda x: x, reversed, sorted]:
2374         fl1.AddResult(set([query.LQ_MODE]), list(fn([
2375           # Same name, but different information
2376           ("aaa", "mode0", None, None),
2377           ("aaa", "mode1", None, None),
2378           ("aaa", "mode2", None, None),
2379           ("aaa", "mode3", None, None),
2380           ])))
2381         fl2.AddResult(set([query.LQ_MODE]), [
2382           ("zzz", "end", None, None),
2383           ("000", "start", None, None),
2384           ] + list(fn2([
2385           ("aaa", "b200", None, None),
2386           ("aaa", "b300", None, None),
2387           ])))
2388         result = self.lm.QueryLocks(["name", "mode"])
2389         self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2390           [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2391           ] + list(fn([
2392           # Name is the same, so order must be equal to incoming order
2393           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2394           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2395           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2396           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2397           ])) + list(fn2([
2398           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2399           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2400           ])) + [
2401           [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2402           ])
2403         for i in [fl1, fl2]:
2404           self.assertEqual(i.CountPending(), 0)
2405
2406
2407 if __name__ == '__main__':
2408   testutils.GanetiTestProgram()