SetEtcHostsEntry: maintain existing ordering
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31 import gc
32 import itertools
33
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
41
42 import testutils
43
44
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
48
49 #: List for looping tests
50 ITERATIONS = range(8)
51
52
53 def _Repeat(fn):
54   """Decorator for executing a function many times"""
55   def wrapper(*args, **kwargs):
56     for i in ITERATIONS:
57       fn(*args, **kwargs)
58   return wrapper
59
60
61 def SafeSleep(duration):
62   start = time.time()
63   while True:
64     delay = start + duration - time.time()
65     if delay <= 0.0:
66       break
67     time.sleep(delay)
68
69
70 class _ThreadedTestCase(unittest.TestCase):
71   """Test class that supports adding/waiting on threads"""
72   def setUp(self):
73     unittest.TestCase.setUp(self)
74     self.done = Queue.Queue(0)
75     self.threads = []
76
77   def _addThread(self, *args, **kwargs):
78     """Create and remember a new thread"""
79     t = threading.Thread(*args, **kwargs)
80     self.threads.append(t)
81     t.start()
82     return t
83
84   def _waitThreads(self):
85     """Wait for all our threads to finish"""
86     for t in self.threads:
87       t.join(60)
88       self.failIf(t.isAlive())
89     self.threads = []
90
91
92 class _ConditionTestCase(_ThreadedTestCase):
93   """Common test case for conditions"""
94
95   def setUp(self, cls):
96     _ThreadedTestCase.setUp(self)
97     self.lock = threading.Lock()
98     self.cond = cls(self.lock)
99
100   def _testAcquireRelease(self):
101     self.assertFalse(self.cond._is_owned())
102     self.assertRaises(RuntimeError, self.cond.wait)
103     self.assertRaises(RuntimeError, self.cond.notifyAll)
104
105     self.cond.acquire()
106     self.assert_(self.cond._is_owned())
107     self.cond.notifyAll()
108     self.assert_(self.cond._is_owned())
109     self.cond.release()
110
111     self.assertFalse(self.cond._is_owned())
112     self.assertRaises(RuntimeError, self.cond.wait)
113     self.assertRaises(RuntimeError, self.cond.notifyAll)
114
115   def _testNotification(self):
116     def _NotifyAll():
117       self.done.put("NE")
118       self.cond.acquire()
119       self.done.put("NA")
120       self.cond.notifyAll()
121       self.done.put("NN")
122       self.cond.release()
123
124     self.cond.acquire()
125     self._addThread(target=_NotifyAll)
126     self.assertEqual(self.done.get(True, 1), "NE")
127     self.assertRaises(Queue.Empty, self.done.get_nowait)
128     self.cond.wait()
129     self.assertEqual(self.done.get(True, 1), "NA")
130     self.assertEqual(self.done.get(True, 1), "NN")
131     self.assert_(self.cond._is_owned())
132     self.cond.release()
133     self.assertFalse(self.cond._is_owned())
134
135
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137   """SingleNotifyPipeCondition tests"""
138
139   def setUp(self):
140     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141
142   def testAcquireRelease(self):
143     self._testAcquireRelease()
144
145   def testNotification(self):
146     self._testNotification()
147
148   def testWaitReuse(self):
149     self.cond.acquire()
150     self.cond.wait(0)
151     self.cond.wait(0.1)
152     self.cond.release()
153
154   def testNoNotifyReuse(self):
155     self.cond.acquire()
156     self.cond.notifyAll()
157     self.assertRaises(RuntimeError, self.cond.wait)
158     self.assertRaises(RuntimeError, self.cond.notifyAll)
159     self.cond.release()
160
161
162 class TestPipeCondition(_ConditionTestCase):
163   """PipeCondition tests"""
164
165   def setUp(self):
166     _ConditionTestCase.setUp(self, locking.PipeCondition)
167
168   def testAcquireRelease(self):
169     self._testAcquireRelease()
170
171   def testNotification(self):
172     self._testNotification()
173
174   def _TestWait(self, fn):
175     threads = [
176       self._addThread(target=fn),
177       self._addThread(target=fn),
178       self._addThread(target=fn),
179       ]
180
181     # Wait for threads to be waiting
182     for _ in threads:
183       self.assertEqual(self.done.get(True, 1), "A")
184
185     self.assertRaises(Queue.Empty, self.done.get_nowait)
186
187     self.cond.acquire()
188     self.assertEqual(len(self.cond._waiters), 3)
189     self.assertEqual(self.cond._waiters, set(threads))
190     # This new thread can't acquire the lock, and thus call wait, before we
191     # release it
192     self._addThread(target=fn)
193     self.cond.notifyAll()
194     self.assertRaises(Queue.Empty, self.done.get_nowait)
195     self.cond.release()
196
197     # We should now get 3 W and 1 A (for the new thread) in whatever order
198     w = 0
199     a = 0
200     for i in range(4):
201       got = self.done.get(True, 1)
202       if got == "W":
203         w += 1
204       elif got == "A":
205         a += 1
206       else:
207         self.fail("Got %s on the done queue" % got)
208
209     self.assertEqual(w, 3)
210     self.assertEqual(a, 1)
211
212     self.cond.acquire()
213     self.cond.notifyAll()
214     self.cond.release()
215     self._waitThreads()
216     self.assertEqual(self.done.get_nowait(), "W")
217     self.assertRaises(Queue.Empty, self.done.get_nowait)
218
219   def testBlockingWait(self):
220     def _BlockingWait():
221       self.cond.acquire()
222       self.done.put("A")
223       self.cond.wait()
224       self.cond.release()
225       self.done.put("W")
226
227     self._TestWait(_BlockingWait)
228
229   def testLongTimeoutWait(self):
230     def _Helper():
231       self.cond.acquire()
232       self.done.put("A")
233       self.cond.wait(15.0)
234       self.cond.release()
235       self.done.put("W")
236
237     self._TestWait(_Helper)
238
239   def _TimeoutWait(self, timeout, check):
240     self.cond.acquire()
241     self.cond.wait(timeout)
242     self.cond.release()
243     self.done.put(check)
244
245   def testShortTimeoutWait(self):
246     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248     self._waitThreads()
249     self.assertEqual(self.done.get_nowait(), "T1")
250     self.assertEqual(self.done.get_nowait(), "T1")
251     self.assertRaises(Queue.Empty, self.done.get_nowait)
252
253   def testZeroTimeoutWait(self):
254     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257     self._waitThreads()
258     self.assertEqual(self.done.get_nowait(), "T0")
259     self.assertEqual(self.done.get_nowait(), "T0")
260     self.assertEqual(self.done.get_nowait(), "T0")
261     self.assertRaises(Queue.Empty, self.done.get_nowait)
262
263
264 class TestSharedLock(_ThreadedTestCase):
265   """SharedLock tests"""
266
267   def setUp(self):
268     _ThreadedTestCase.setUp(self)
269     self.sl = locking.SharedLock("TestSharedLock")
270
271   def testSequenceAndOwnership(self):
272     self.assertFalse(self.sl._is_owned())
273     self.sl.acquire(shared=1)
274     self.assert_(self.sl._is_owned())
275     self.assert_(self.sl._is_owned(shared=1))
276     self.assertFalse(self.sl._is_owned(shared=0))
277     self.sl.release()
278     self.assertFalse(self.sl._is_owned())
279     self.sl.acquire()
280     self.assert_(self.sl._is_owned())
281     self.assertFalse(self.sl._is_owned(shared=1))
282     self.assert_(self.sl._is_owned(shared=0))
283     self.sl.release()
284     self.assertFalse(self.sl._is_owned())
285     self.sl.acquire(shared=1)
286     self.assert_(self.sl._is_owned())
287     self.assert_(self.sl._is_owned(shared=1))
288     self.assertFalse(self.sl._is_owned(shared=0))
289     self.sl.release()
290     self.assertFalse(self.sl._is_owned())
291
292   def testBooleanValue(self):
293     # semaphores are supposed to return a true value on a successful acquire
294     self.assert_(self.sl.acquire(shared=1))
295     self.sl.release()
296     self.assert_(self.sl.acquire())
297     self.sl.release()
298
299   def testDoubleLockingStoE(self):
300     self.sl.acquire(shared=1)
301     self.assertRaises(AssertionError, self.sl.acquire)
302
303   def testDoubleLockingEtoS(self):
304     self.sl.acquire()
305     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306
307   def testDoubleLockingStoS(self):
308     self.sl.acquire(shared=1)
309     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310
311   def testDoubleLockingEtoE(self):
312     self.sl.acquire()
313     self.assertRaises(AssertionError, self.sl.acquire)
314
315   # helper functions: called in a separate thread they acquire the lock, send
316   # their identifier on the done queue, then release it.
317   def _doItSharer(self):
318     try:
319       self.sl.acquire(shared=1)
320       self.done.put('SHR')
321       self.sl.release()
322     except errors.LockError:
323       self.done.put('ERR')
324
325   def _doItExclusive(self):
326     try:
327       self.sl.acquire()
328       self.done.put('EXC')
329       self.sl.release()
330     except errors.LockError:
331       self.done.put('ERR')
332
333   def _doItDelete(self):
334     try:
335       self.sl.delete()
336       self.done.put('DEL')
337     except errors.LockError:
338       self.done.put('ERR')
339
340   def testSharersCanCoexist(self):
341     self.sl.acquire(shared=1)
342     threading.Thread(target=self._doItSharer).start()
343     self.assert_(self.done.get(True, 1))
344     self.sl.release()
345
346   @_Repeat
347   def testExclusiveBlocksExclusive(self):
348     self.sl.acquire()
349     self._addThread(target=self._doItExclusive)
350     self.assertRaises(Queue.Empty, self.done.get_nowait)
351     self.sl.release()
352     self._waitThreads()
353     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354
355   @_Repeat
356   def testExclusiveBlocksDelete(self):
357     self.sl.acquire()
358     self._addThread(target=self._doItDelete)
359     self.assertRaises(Queue.Empty, self.done.get_nowait)
360     self.sl.release()
361     self._waitThreads()
362     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363     self.sl = locking.SharedLock(self.sl.name)
364
365   @_Repeat
366   def testExclusiveBlocksSharer(self):
367     self.sl.acquire()
368     self._addThread(target=self._doItSharer)
369     self.assertRaises(Queue.Empty, self.done.get_nowait)
370     self.sl.release()
371     self._waitThreads()
372     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373
374   @_Repeat
375   def testSharerBlocksExclusive(self):
376     self.sl.acquire(shared=1)
377     self._addThread(target=self._doItExclusive)
378     self.assertRaises(Queue.Empty, self.done.get_nowait)
379     self.sl.release()
380     self._waitThreads()
381     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382
383   @_Repeat
384   def testSharerBlocksDelete(self):
385     self.sl.acquire(shared=1)
386     self._addThread(target=self._doItDelete)
387     self.assertRaises(Queue.Empty, self.done.get_nowait)
388     self.sl.release()
389     self._waitThreads()
390     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391     self.sl = locking.SharedLock(self.sl.name)
392
393   @_Repeat
394   def testWaitingExclusiveBlocksSharer(self):
395     """SKIPPED testWaitingExclusiveBlockSharer"""
396     return
397
398     self.sl.acquire(shared=1)
399     # the lock is acquired in shared mode...
400     self._addThread(target=self._doItExclusive)
401     # ...but now an exclusive is waiting...
402     self._addThread(target=self._doItSharer)
403     # ...so the sharer should be blocked as well
404     self.assertRaises(Queue.Empty, self.done.get_nowait)
405     self.sl.release()
406     self._waitThreads()
407     # The exclusive passed before
408     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410
411   @_Repeat
412   def testWaitingSharerBlocksExclusive(self):
413     """SKIPPED testWaitingSharerBlocksExclusive"""
414     return
415
416     self.sl.acquire()
417     # the lock is acquired in exclusive mode...
418     self._addThread(target=self._doItSharer)
419     # ...but now a sharer is waiting...
420     self._addThread(target=self._doItExclusive)
421     # ...the exclusive is waiting too...
422     self.assertRaises(Queue.Empty, self.done.get_nowait)
423     self.sl.release()
424     self._waitThreads()
425     # The sharer passed before
426     self.assertEqual(self.done.get_nowait(), 'SHR')
427     self.assertEqual(self.done.get_nowait(), 'EXC')
428
429   def testDelete(self):
430     self.sl.delete()
431     self.assertRaises(errors.LockError, self.sl.acquire)
432     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433     self.assertRaises(errors.LockError, self.sl.delete)
434
435   def testDeleteTimeout(self):
436     self.sl.delete(timeout=60)
437
438   def testNoDeleteIfSharer(self):
439     self.sl.acquire(shared=1)
440     self.assertRaises(AssertionError, self.sl.delete)
441
442   @_Repeat
443   def testDeletePendingSharersExclusiveDelete(self):
444     self.sl.acquire()
445     self._addThread(target=self._doItSharer)
446     self._addThread(target=self._doItSharer)
447     self._addThread(target=self._doItExclusive)
448     self._addThread(target=self._doItDelete)
449     self.sl.delete()
450     self._waitThreads()
451     # The threads who were pending return ERR
452     for _ in range(4):
453       self.assertEqual(self.done.get_nowait(), 'ERR')
454     self.sl = locking.SharedLock(self.sl.name)
455
456   @_Repeat
457   def testDeletePendingDeleteExclusiveSharers(self):
458     self.sl.acquire()
459     self._addThread(target=self._doItDelete)
460     self._addThread(target=self._doItExclusive)
461     self._addThread(target=self._doItSharer)
462     self._addThread(target=self._doItSharer)
463     self.sl.delete()
464     self._waitThreads()
465     # The two threads who were pending return both ERR
466     self.assertEqual(self.done.get_nowait(), 'ERR')
467     self.assertEqual(self.done.get_nowait(), 'ERR')
468     self.assertEqual(self.done.get_nowait(), 'ERR')
469     self.assertEqual(self.done.get_nowait(), 'ERR')
470     self.sl = locking.SharedLock(self.sl.name)
471
472   @_Repeat
473   def testExclusiveAcquireTimeout(self):
474     for shared in [0, 1]:
475       on_queue = threading.Event()
476       release_exclusive = threading.Event()
477
478       def _LockExclusive():
479         self.sl.acquire(shared=0, test_notify=on_queue.set)
480         self.done.put("A: start wait")
481         release_exclusive.wait()
482         self.done.put("A: end wait")
483         self.sl.release()
484
485       # Start thread to hold lock in exclusive mode
486       self._addThread(target=_LockExclusive)
487
488       # Wait for wait to begin
489       self.assertEqual(self.done.get(timeout=60), "A: start wait")
490
491       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
492       # on the queue
493       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494                                       test_notify=release_exclusive.set))
495
496       self.done.put("got 2nd")
497       self.sl.release()
498
499       self._waitThreads()
500
501       self.assertEqual(self.done.get_nowait(), "A: end wait")
502       self.assertEqual(self.done.get_nowait(), "got 2nd")
503       self.assertRaises(Queue.Empty, self.done.get_nowait)
504
505   @_Repeat
506   def testAcquireExpiringTimeout(self):
507     def _AcquireWithTimeout(shared, timeout):
508       if not self.sl.acquire(shared=shared, timeout=timeout):
509         self.done.put("timeout")
510
511     for shared in [0, 1]:
512       # Lock exclusively
513       self.sl.acquire()
514
515       # Start shared acquires with timeout between 0 and 20 ms
516       for i in range(11):
517         self._addThread(target=_AcquireWithTimeout,
518                         args=(shared, i * 2.0 / 1000.0))
519
520       # Wait for threads to finish (makes sure the acquire timeout expires
521       # before releasing the lock)
522       self._waitThreads()
523
524       # Release lock
525       self.sl.release()
526
527       for _ in range(11):
528         self.assertEqual(self.done.get_nowait(), "timeout")
529
530       self.assertRaises(Queue.Empty, self.done.get_nowait)
531
532   @_Repeat
533   def testSharedSkipExclusiveAcquires(self):
534     # Tests whether shared acquires jump in front of exclusive acquires in the
535     # queue.
536
537     def _Acquire(shared, name, notify_ev, wait_ev):
538       if notify_ev:
539         notify_fn = notify_ev.set
540       else:
541         notify_fn = None
542
543       if wait_ev:
544         wait_ev.wait()
545
546       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
547         return
548
549       self.done.put(name)
550       self.sl.release()
551
552     # Get exclusive lock while we fill the queue
553     self.sl.acquire()
554
555     shrcnt1 = 5
556     shrcnt2 = 7
557     shrcnt3 = 9
558     shrcnt4 = 2
559
560     # Add acquires using threading.Event for synchronization. They'll be
561     # acquired exactly in the order defined in this list.
562     acquires = (shrcnt1 * [(1, "shared 1")] +
563                 3 * [(0, "exclusive 1")] +
564                 shrcnt2 * [(1, "shared 2")] +
565                 shrcnt3 * [(1, "shared 3")] +
566                 shrcnt4 * [(1, "shared 4")] +
567                 3 * [(0, "exclusive 2")])
568
569     ev_cur = None
570     ev_prev = None
571
572     for args in acquires:
573       ev_cur = threading.Event()
574       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
575       ev_prev = ev_cur
576
577     # Wait for last acquire to start
578     ev_prev.wait()
579
580     # Expect 6 pending exclusive acquires and 1 for all shared acquires
581     # together
582     self.assertEqual(self.sl._count_pending(), 7)
583
584     # Release exclusive lock and wait
585     self.sl.release()
586
587     self._waitThreads()
588
589     # Check sequence
590     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591       # Shared locks aren't guaranteed to be notified in order, but they'll be
592       # first
593       tmp = self.done.get_nowait()
594       if tmp == "shared 1":
595         shrcnt1 -= 1
596       elif tmp == "shared 2":
597         shrcnt2 -= 1
598       elif tmp == "shared 3":
599         shrcnt3 -= 1
600       elif tmp == "shared 4":
601         shrcnt4 -= 1
602     self.assertEqual(shrcnt1, 0)
603     self.assertEqual(shrcnt2, 0)
604     self.assertEqual(shrcnt3, 0)
605     self.assertEqual(shrcnt3, 0)
606
607     for _ in range(3):
608       self.assertEqual(self.done.get_nowait(), "exclusive 1")
609
610     for _ in range(3):
611       self.assertEqual(self.done.get_nowait(), "exclusive 2")
612
613     self.assertRaises(Queue.Empty, self.done.get_nowait)
614
615   @_Repeat
616   def testMixedAcquireTimeout(self):
617     sync = threading.Event()
618
619     def _AcquireShared(ev):
620       if not self.sl.acquire(shared=1, timeout=None):
621         return
622
623       self.done.put("shared")
624
625       # Notify main thread
626       ev.set()
627
628       # Wait for notification from main thread
629       sync.wait()
630
631       # Release lock
632       self.sl.release()
633
634     acquires = []
635     for _ in range(3):
636       ev = threading.Event()
637       self._addThread(target=_AcquireShared, args=(ev, ))
638       acquires.append(ev)
639
640     # Wait for all acquires to finish
641     for i in acquires:
642       i.wait()
643
644     self.assertEqual(self.sl._count_pending(), 0)
645
646     # Try to get exclusive lock
647     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
648
649     # Acquire exclusive without timeout
650     exclsync = threading.Event()
651     exclev = threading.Event()
652
653     def _AcquireExclusive():
654       if not self.sl.acquire(shared=0):
655         return
656
657       self.done.put("exclusive")
658
659       # Notify main thread
660       exclev.set()
661
662       # Wait for notification from main thread
663       exclsync.wait()
664
665       self.sl.release()
666
667     self._addThread(target=_AcquireExclusive)
668
669     # Try to get exclusive lock
670     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
671
672     # Make all shared holders release their locks
673     sync.set()
674
675     # Wait for exclusive acquire to succeed
676     exclev.wait()
677
678     self.assertEqual(self.sl._count_pending(), 0)
679
680     # Try to get exclusive lock
681     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
682
683     def _AcquireSharedSimple():
684       if self.sl.acquire(shared=1, timeout=None):
685         self.done.put("shared2")
686         self.sl.release()
687
688     for _ in range(10):
689       self._addThread(target=_AcquireSharedSimple)
690
691     # Tell exclusive lock to release
692     exclsync.set()
693
694     # Wait for everything to finish
695     self._waitThreads()
696
697     self.assertEqual(self.sl._count_pending(), 0)
698
699     # Check sequence
700     for _ in range(3):
701       self.assertEqual(self.done.get_nowait(), "shared")
702
703     self.assertEqual(self.done.get_nowait(), "exclusive")
704
705     for _ in range(10):
706       self.assertEqual(self.done.get_nowait(), "shared2")
707
708     self.assertRaises(Queue.Empty, self.done.get_nowait)
709
710   def testPriority(self):
711     # Acquire in exclusive mode
712     self.assert_(self.sl.acquire(shared=0))
713
714     # Queue acquires
715     def _Acquire(prev, next, shared, priority, result):
716       prev.wait()
717       self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
718       try:
719         self.done.put(result)
720       finally:
721         self.sl.release()
722
723     counter = itertools.count(0)
724     priorities = range(-20, 30)
725     first = threading.Event()
726     prev = first
727
728     # Data structure:
729     # {
730     #   priority:
731     #     [(shared/exclusive, set(acquire names), set(pending threads)),
732     #      (shared/exclusive, ...),
733     #      ...,
734     #     ],
735     # }
736     perprio = {}
737
738     # References shared acquire per priority in L{perprio}. Data structure:
739     # {
740     #   priority: (shared=1, set(acquire names), set(pending threads)),
741     # }
742     prioshared = {}
743
744     for seed in [4979, 9523, 14902, 32440]:
745       # Use a deterministic random generator
746       rnd = random.Random(seed)
747       for priority in [rnd.choice(priorities) for _ in range(30)]:
748         modes = [0, 1]
749         rnd.shuffle(modes)
750         for shared in modes:
751           # Unique name
752           acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
753
754           ev = threading.Event()
755           thread = self._addThread(target=_Acquire,
756                                    args=(prev, ev, shared, priority, acqname))
757           prev = ev
758
759           # Record expected aqcuire, see above for structure
760           data = (shared, set([acqname]), set([thread]))
761           priolist = perprio.setdefault(priority, [])
762           if shared:
763             priosh = prioshared.get(priority, None)
764             if priosh:
765               # Shared acquires are merged
766               for i, j in zip(priosh[1:], data[1:]):
767                 i.update(j)
768               assert data[0] == priosh[0]
769             else:
770               prioshared[priority] = data
771               priolist.append(data)
772           else:
773             priolist.append(data)
774
775     # Start all acquires and wait for them
776     first.set()
777     prev.wait()
778
779     # Check lock information
780     self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
781     self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
782                      (self.sl.name, "exclusive",
783                       [threading.currentThread().getName()], None))
784
785     self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
786
787     # Let threads acquire the lock
788     self.sl.release()
789
790     # Wait for everything to finish
791     self._waitThreads()
792
793     self.assert_(self.sl._check_empty())
794
795     # Check acquires by priority
796     for acquires in [perprio[i] for i in sorted(perprio.keys())]:
797       for (_, names, _) in acquires:
798         # For shared acquires, the set will contain 1..n entries. For exclusive
799         # acquires only one.
800         while names:
801           names.remove(self.done.get_nowait())
802       self.assertFalse(compat.any(names for (_, names, _) in acquires))
803
804     self.assertRaises(Queue.Empty, self.done.get_nowait)
805
806   def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
807     self.assertEqual(name, self.sl.name)
808     self.assert_(mode is None)
809     self.assert_(owner is None)
810
811     self.assertEqual([(pendmode, sorted(waiting))
812                       for (pendmode, waiting) in pending],
813                      [(["exclusive", "shared"][int(bool(shared))],
814                        sorted(t.getName() for t in threads))
815                       for acquires in [perprio[i]
816                                        for i in sorted(perprio.keys())]
817                       for (shared, _, threads) in acquires])
818
819
820 class TestSharedLockInCondition(_ThreadedTestCase):
821   """SharedLock as a condition lock tests"""
822
823   def setUp(self):
824     _ThreadedTestCase.setUp(self)
825     self.sl = locking.SharedLock("TestSharedLockInCondition")
826     self.setCondition()
827
828   def setCondition(self):
829     self.cond = threading.Condition(self.sl)
830
831   def testKeepMode(self):
832     self.cond.acquire(shared=1)
833     self.assert_(self.sl._is_owned(shared=1))
834     self.cond.wait(0)
835     self.assert_(self.sl._is_owned(shared=1))
836     self.cond.release()
837     self.cond.acquire(shared=0)
838     self.assert_(self.sl._is_owned(shared=0))
839     self.cond.wait(0)
840     self.assert_(self.sl._is_owned(shared=0))
841     self.cond.release()
842
843
844 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
845   """SharedLock as a pipe condition lock tests"""
846
847   def setCondition(self):
848     self.cond = locking.PipeCondition(self.sl)
849
850
851 class TestSSynchronizedDecorator(_ThreadedTestCase):
852   """Shared Lock Synchronized decorator test"""
853
854   def setUp(self):
855     _ThreadedTestCase.setUp(self)
856
857   @locking.ssynchronized(_decoratorlock)
858   def _doItExclusive(self):
859     self.assert_(_decoratorlock._is_owned())
860     self.done.put('EXC')
861
862   @locking.ssynchronized(_decoratorlock, shared=1)
863   def _doItSharer(self):
864     self.assert_(_decoratorlock._is_owned(shared=1))
865     self.done.put('SHR')
866
867   def testDecoratedFunctions(self):
868     self._doItExclusive()
869     self.assertFalse(_decoratorlock._is_owned())
870     self._doItSharer()
871     self.assertFalse(_decoratorlock._is_owned())
872
873   def testSharersCanCoexist(self):
874     _decoratorlock.acquire(shared=1)
875     threading.Thread(target=self._doItSharer).start()
876     self.assert_(self.done.get(True, 1))
877     _decoratorlock.release()
878
879   @_Repeat
880   def testExclusiveBlocksExclusive(self):
881     _decoratorlock.acquire()
882     self._addThread(target=self._doItExclusive)
883     # give it a bit of time to check that it's not actually doing anything
884     self.assertRaises(Queue.Empty, self.done.get_nowait)
885     _decoratorlock.release()
886     self._waitThreads()
887     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
888
889   @_Repeat
890   def testExclusiveBlocksSharer(self):
891     _decoratorlock.acquire()
892     self._addThread(target=self._doItSharer)
893     self.assertRaises(Queue.Empty, self.done.get_nowait)
894     _decoratorlock.release()
895     self._waitThreads()
896     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
897
898   @_Repeat
899   def testSharerBlocksExclusive(self):
900     _decoratorlock.acquire(shared=1)
901     self._addThread(target=self._doItExclusive)
902     self.assertRaises(Queue.Empty, self.done.get_nowait)
903     _decoratorlock.release()
904     self._waitThreads()
905     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
906
907
908 class TestLockSet(_ThreadedTestCase):
909   """LockSet tests"""
910
911   def setUp(self):
912     _ThreadedTestCase.setUp(self)
913     self._setUpLS()
914
915   def _setUpLS(self):
916     """Helper to (re)initialize the lock set"""
917     self.resources = ['one', 'two', 'three']
918     self.ls = locking.LockSet(self.resources, "TestLockSet")
919
920   def testResources(self):
921     self.assertEquals(self.ls._names(), set(self.resources))
922     newls = locking.LockSet([], "TestLockSet.testResources")
923     self.assertEquals(newls._names(), set())
924
925   def testAcquireRelease(self):
926     self.assert_(self.ls.acquire('one'))
927     self.assertEquals(self.ls._list_owned(), set(['one']))
928     self.ls.release()
929     self.assertEquals(self.ls._list_owned(), set())
930     self.assertEquals(self.ls.acquire(['one']), set(['one']))
931     self.assertEquals(self.ls._list_owned(), set(['one']))
932     self.ls.release()
933     self.assertEquals(self.ls._list_owned(), set())
934     self.ls.acquire(['one', 'two', 'three'])
935     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
936     self.ls.release('one')
937     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
938     self.ls.release(['three'])
939     self.assertEquals(self.ls._list_owned(), set(['two']))
940     self.ls.release()
941     self.assertEquals(self.ls._list_owned(), set())
942     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
943     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
944     self.ls.release()
945     self.assertEquals(self.ls._list_owned(), set())
946
947   def testNoDoubleAcquire(self):
948     self.ls.acquire('one')
949     self.assertRaises(AssertionError, self.ls.acquire, 'one')
950     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
951     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
952     self.ls.release()
953     self.ls.acquire(['one', 'three'])
954     self.ls.release('one')
955     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
956     self.ls.release('three')
957
958   def testNoWrongRelease(self):
959     self.assertRaises(AssertionError, self.ls.release)
960     self.ls.acquire('one')
961     self.assertRaises(AssertionError, self.ls.release, 'two')
962
963   def testAddRemove(self):
964     self.ls.add('four')
965     self.assertEquals(self.ls._list_owned(), set())
966     self.assert_('four' in self.ls._names())
967     self.ls.add(['five', 'six', 'seven'], acquired=1)
968     self.assert_('five' in self.ls._names())
969     self.assert_('six' in self.ls._names())
970     self.assert_('seven' in self.ls._names())
971     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
972     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
973     self.assert_('five' not in self.ls._names())
974     self.assert_('six' not in self.ls._names())
975     self.assertEquals(self.ls._list_owned(), set(['seven']))
976     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
977     self.ls.remove('seven')
978     self.assert_('seven' not in self.ls._names())
979     self.assertEquals(self.ls._list_owned(), set([]))
980     self.ls.acquire(None, shared=1)
981     self.assertRaises(AssertionError, self.ls.add, 'eight')
982     self.ls.release()
983     self.ls.acquire(None)
984     self.ls.add('eight', acquired=1)
985     self.assert_('eight' in self.ls._names())
986     self.assert_('eight' in self.ls._list_owned())
987     self.ls.add('nine')
988     self.assert_('nine' in self.ls._names())
989     self.assert_('nine' not in self.ls._list_owned())
990     self.ls.release()
991     self.ls.remove(['two'])
992     self.assert_('two' not in self.ls._names())
993     self.ls.acquire('three')
994     self.assertEquals(self.ls.remove(['three']), ['three'])
995     self.assert_('three' not in self.ls._names())
996     self.assertEquals(self.ls.remove('three'), [])
997     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
998     self.assert_('one' not in self.ls._names())
999
1000   def testRemoveNonBlocking(self):
1001     self.ls.acquire('one')
1002     self.assertEquals(self.ls.remove('one'), ['one'])
1003     self.ls.acquire(['two', 'three'])
1004     self.assertEquals(self.ls.remove(['two', 'three']),
1005                       ['two', 'three'])
1006
1007   def testNoDoubleAdd(self):
1008     self.assertRaises(errors.LockError, self.ls.add, 'two')
1009     self.ls.add('four')
1010     self.assertRaises(errors.LockError, self.ls.add, 'four')
1011
1012   def testNoWrongRemoves(self):
1013     self.ls.acquire(['one', 'three'], shared=1)
1014     # Cannot remove 'two' while holding something which is not a superset
1015     self.assertRaises(AssertionError, self.ls.remove, 'two')
1016     # Cannot remove 'three' as we are sharing it
1017     self.assertRaises(AssertionError, self.ls.remove, 'three')
1018
1019   def testAcquireSetLock(self):
1020     # acquire the set-lock exclusively
1021     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1022     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1023     self.assertEquals(self.ls._is_owned(), True)
1024     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1025     # I can still add/remove elements...
1026     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1027     self.assert_(self.ls.add('six'))
1028     self.ls.release()
1029     # share the set-lock
1030     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1031     # adding new elements is not possible
1032     self.assertRaises(AssertionError, self.ls.add, 'five')
1033     self.ls.release()
1034
1035   def testAcquireWithRepetitions(self):
1036     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1037                       set(['two', 'two', 'three']))
1038     self.ls.release(['two', 'two'])
1039     self.assertEquals(self.ls._list_owned(), set(['three']))
1040
1041   def testEmptyAcquire(self):
1042     # Acquire an empty list of locks...
1043     self.assertEquals(self.ls.acquire([]), set())
1044     self.assertEquals(self.ls._list_owned(), set())
1045     # New locks can still be addded
1046     self.assert_(self.ls.add('six'))
1047     # "re-acquiring" is not an issue, since we had really acquired nothing
1048     self.assertEquals(self.ls.acquire([], shared=1), set())
1049     self.assertEquals(self.ls._list_owned(), set())
1050     # We haven't really acquired anything, so we cannot release
1051     self.assertRaises(AssertionError, self.ls.release)
1052
1053   def _doLockSet(self, names, shared):
1054     try:
1055       self.ls.acquire(names, shared=shared)
1056       self.done.put('DONE')
1057       self.ls.release()
1058     except errors.LockError:
1059       self.done.put('ERR')
1060
1061   def _doAddSet(self, names):
1062     try:
1063       self.ls.add(names, acquired=1)
1064       self.done.put('DONE')
1065       self.ls.release()
1066     except errors.LockError:
1067       self.done.put('ERR')
1068
1069   def _doRemoveSet(self, names):
1070     self.done.put(self.ls.remove(names))
1071
1072   @_Repeat
1073   def testConcurrentSharedAcquire(self):
1074     self.ls.acquire(['one', 'two'], shared=1)
1075     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1076     self._waitThreads()
1077     self.assertEqual(self.done.get_nowait(), 'DONE')
1078     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1079     self._waitThreads()
1080     self.assertEqual(self.done.get_nowait(), 'DONE')
1081     self._addThread(target=self._doLockSet, args=('three', 1))
1082     self._waitThreads()
1083     self.assertEqual(self.done.get_nowait(), 'DONE')
1084     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1085     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1086     self.assertRaises(Queue.Empty, self.done.get_nowait)
1087     self.ls.release()
1088     self._waitThreads()
1089     self.assertEqual(self.done.get_nowait(), 'DONE')
1090     self.assertEqual(self.done.get_nowait(), 'DONE')
1091
1092   @_Repeat
1093   def testConcurrentExclusiveAcquire(self):
1094     self.ls.acquire(['one', 'two'])
1095     self._addThread(target=self._doLockSet, args=('three', 1))
1096     self._waitThreads()
1097     self.assertEqual(self.done.get_nowait(), 'DONE')
1098     self._addThread(target=self._doLockSet, args=('three', 0))
1099     self._waitThreads()
1100     self.assertEqual(self.done.get_nowait(), 'DONE')
1101     self.assertRaises(Queue.Empty, self.done.get_nowait)
1102     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1103     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1104     self._addThread(target=self._doLockSet, args=('one', 0))
1105     self._addThread(target=self._doLockSet, args=('one', 1))
1106     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1107     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1108     self.assertRaises(Queue.Empty, self.done.get_nowait)
1109     self.ls.release()
1110     self._waitThreads()
1111     for _ in range(6):
1112       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1113
1114   @_Repeat
1115   def testSimpleAcquireTimeoutExpiring(self):
1116     names = sorted(self.ls._names())
1117     self.assert_(len(names) >= 3)
1118
1119     # Get name of first lock
1120     first = names[0]
1121
1122     # Get name of last lock
1123     last = names.pop()
1124
1125     checks = [
1126       # Block first and try to lock it again
1127       (first, first),
1128
1129       # Block last and try to lock all locks
1130       (None, first),
1131
1132       # Block last and try to lock it again
1133       (last, last),
1134       ]
1135
1136     for (wanted, block) in checks:
1137       # Lock in exclusive mode
1138       self.assert_(self.ls.acquire(block, shared=0))
1139
1140       def _AcquireOne():
1141         # Try to get the same lock again with a timeout (should never succeed)
1142         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1143         if acquired:
1144           self.done.put("acquired")
1145           self.ls.release()
1146         else:
1147           self.assert_(acquired is None)
1148           self.assertFalse(self.ls._list_owned())
1149           self.assertFalse(self.ls._is_owned())
1150           self.done.put("not acquired")
1151
1152       self._addThread(target=_AcquireOne)
1153
1154       # Wait for timeout in thread to expire
1155       self._waitThreads()
1156
1157       # Release exclusive lock again
1158       self.ls.release()
1159
1160       self.assertEqual(self.done.get_nowait(), "not acquired")
1161       self.assertRaises(Queue.Empty, self.done.get_nowait)
1162
1163   @_Repeat
1164   def testDelayedAndExpiringLockAcquire(self):
1165     self._setUpLS()
1166     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1167
1168     for expire in (False, True):
1169       names = sorted(self.ls._names())
1170       self.assertEqual(len(names), 8)
1171
1172       lock_ev = dict([(i, threading.Event()) for i in names])
1173
1174       # Lock all in exclusive mode
1175       self.assert_(self.ls.acquire(names, shared=0))
1176
1177       if expire:
1178         # We'll wait at least 300ms per lock
1179         lockwait = len(names) * [0.3]
1180
1181         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1182         # this gives us up to 2.4s to fail.
1183         lockall_timeout = 0.4
1184       else:
1185         # This should finish rather quickly
1186         lockwait = None
1187         lockall_timeout = len(names) * 5.0
1188
1189       def _LockAll():
1190         def acquire_notification(name):
1191           if not expire:
1192             self.done.put("getting %s" % name)
1193
1194           # Kick next lock
1195           lock_ev[name].set()
1196
1197         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1198                            test_notify=acquire_notification):
1199           self.done.put("got all")
1200           self.ls.release()
1201         else:
1202           self.done.put("timeout on all")
1203
1204         # Notify all locks
1205         for ev in lock_ev.values():
1206           ev.set()
1207
1208       t = self._addThread(target=_LockAll)
1209
1210       for idx, name in enumerate(names):
1211         # Wait for actual acquire on this lock to start
1212         lock_ev[name].wait(10.0)
1213
1214         if expire and t.isAlive():
1215           # Wait some time after getting the notification to make sure the lock
1216           # acquire will expire
1217           SafeSleep(lockwait[idx])
1218
1219         self.ls.release(names=name)
1220
1221       self.assertFalse(self.ls._list_owned())
1222
1223       self._waitThreads()
1224
1225       if expire:
1226         # Not checking which locks were actually acquired. Doing so would be
1227         # too timing-dependant.
1228         self.assertEqual(self.done.get_nowait(), "timeout on all")
1229       else:
1230         for i in names:
1231           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1232         self.assertEqual(self.done.get_nowait(), "got all")
1233       self.assertRaises(Queue.Empty, self.done.get_nowait)
1234
1235   @_Repeat
1236   def testConcurrentRemove(self):
1237     self.ls.add('four')
1238     self.ls.acquire(['one', 'two', 'four'])
1239     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1240     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1241     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1242     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1243     self.assertRaises(Queue.Empty, self.done.get_nowait)
1244     self.ls.remove('one')
1245     self.ls.release()
1246     self._waitThreads()
1247     for i in range(4):
1248       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1249     self.ls.add(['five', 'six'], acquired=1)
1250     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1251     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1252     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1253     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1254     self.ls.remove('five')
1255     self.ls.release()
1256     self._waitThreads()
1257     for i in range(4):
1258       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1259     self.ls.acquire(['three', 'four'])
1260     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1261     self.assertRaises(Queue.Empty, self.done.get_nowait)
1262     self.ls.remove('four')
1263     self._waitThreads()
1264     self.assertEqual(self.done.get_nowait(), ['six'])
1265     self._addThread(target=self._doRemoveSet, args=(['two']))
1266     self._waitThreads()
1267     self.assertEqual(self.done.get_nowait(), ['two'])
1268     self.ls.release()
1269     # reset lockset
1270     self._setUpLS()
1271
1272   @_Repeat
1273   def testConcurrentSharedSetLock(self):
1274     # share the set-lock...
1275     self.ls.acquire(None, shared=1)
1276     # ...another thread can share it too
1277     self._addThread(target=self._doLockSet, args=(None, 1))
1278     self._waitThreads()
1279     self.assertEqual(self.done.get_nowait(), 'DONE')
1280     # ...or just share some elements
1281     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1282     self._waitThreads()
1283     self.assertEqual(self.done.get_nowait(), 'DONE')
1284     # ...but not add new ones or remove any
1285     t = self._addThread(target=self._doAddSet, args=(['nine']))
1286     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1287     self.assertRaises(Queue.Empty, self.done.get_nowait)
1288     # this just releases the set-lock
1289     self.ls.release([])
1290     t.join(60)
1291     self.assertEqual(self.done.get_nowait(), 'DONE')
1292     # release the lock on the actual elements so remove() can proceed too
1293     self.ls.release()
1294     self._waitThreads()
1295     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1296     # reset lockset
1297     self._setUpLS()
1298
1299   @_Repeat
1300   def testConcurrentExclusiveSetLock(self):
1301     # acquire the set-lock...
1302     self.ls.acquire(None, shared=0)
1303     # ...no one can do anything else
1304     self._addThread(target=self._doLockSet, args=(None, 1))
1305     self._addThread(target=self._doLockSet, args=(None, 0))
1306     self._addThread(target=self._doLockSet, args=(['three'], 0))
1307     self._addThread(target=self._doLockSet, args=(['two'], 1))
1308     self._addThread(target=self._doAddSet, args=(['nine']))
1309     self.assertRaises(Queue.Empty, self.done.get_nowait)
1310     self.ls.release()
1311     self._waitThreads()
1312     for _ in range(5):
1313       self.assertEqual(self.done.get(True, 1), 'DONE')
1314     # cleanup
1315     self._setUpLS()
1316
1317   @_Repeat
1318   def testConcurrentSetLockAdd(self):
1319     self.ls.acquire('one')
1320     # Another thread wants the whole SetLock
1321     self._addThread(target=self._doLockSet, args=(None, 0))
1322     self._addThread(target=self._doLockSet, args=(None, 1))
1323     self.assertRaises(Queue.Empty, self.done.get_nowait)
1324     self.assertRaises(AssertionError, self.ls.add, 'four')
1325     self.ls.release()
1326     self._waitThreads()
1327     self.assertEqual(self.done.get_nowait(), 'DONE')
1328     self.assertEqual(self.done.get_nowait(), 'DONE')
1329     self.ls.acquire(None)
1330     self._addThread(target=self._doLockSet, args=(None, 0))
1331     self._addThread(target=self._doLockSet, args=(None, 1))
1332     self.assertRaises(Queue.Empty, self.done.get_nowait)
1333     self.ls.add('four')
1334     self.ls.add('five', acquired=1)
1335     self.ls.add('six', acquired=1, shared=1)
1336     self.assertEquals(self.ls._list_owned(),
1337       set(['one', 'two', 'three', 'five', 'six']))
1338     self.assertEquals(self.ls._is_owned(), True)
1339     self.assertEquals(self.ls._names(),
1340       set(['one', 'two', 'three', 'four', 'five', 'six']))
1341     self.ls.release()
1342     self._waitThreads()
1343     self.assertEqual(self.done.get_nowait(), 'DONE')
1344     self.assertEqual(self.done.get_nowait(), 'DONE')
1345     self._setUpLS()
1346
1347   @_Repeat
1348   def testEmptyLockSet(self):
1349     # get the set-lock
1350     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1351     # now empty it...
1352     self.ls.remove(['one', 'two', 'three'])
1353     # and adds/locks by another thread still wait
1354     self._addThread(target=self._doAddSet, args=(['nine']))
1355     self._addThread(target=self._doLockSet, args=(None, 1))
1356     self._addThread(target=self._doLockSet, args=(None, 0))
1357     self.assertRaises(Queue.Empty, self.done.get_nowait)
1358     self.ls.release()
1359     self._waitThreads()
1360     for _ in range(3):
1361       self.assertEqual(self.done.get_nowait(), 'DONE')
1362     # empty it again...
1363     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1364     # now share it...
1365     self.assertEqual(self.ls.acquire(None, shared=1), set())
1366     # other sharers can go, adds still wait
1367     self._addThread(target=self._doLockSet, args=(None, 1))
1368     self._waitThreads()
1369     self.assertEqual(self.done.get_nowait(), 'DONE')
1370     self._addThread(target=self._doAddSet, args=(['nine']))
1371     self.assertRaises(Queue.Empty, self.done.get_nowait)
1372     self.ls.release()
1373     self._waitThreads()
1374     self.assertEqual(self.done.get_nowait(), 'DONE')
1375     self._setUpLS()
1376
1377   def testPriority(self):
1378     def _Acquire(prev, next, name, priority, success_fn):
1379       prev.wait()
1380       self.assert_(self.ls.acquire(name, shared=0,
1381                                    priority=priority,
1382                                    test_notify=lambda _: next.set()))
1383       try:
1384         success_fn()
1385       finally:
1386         self.ls.release()
1387
1388     # Get all in exclusive mode
1389     self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1390
1391     done_two = Queue.Queue(0)
1392
1393     first = threading.Event()
1394     prev = first
1395
1396     acquires = [("one", prio, self.done) for prio in range(1, 33)]
1397     acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1398
1399     # Use a deterministic random generator
1400     random.Random(741).shuffle(acquires)
1401
1402     for (name, prio, done) in acquires:
1403       ev = threading.Event()
1404       self._addThread(target=_Acquire,
1405                       args=(prev, ev, name, prio,
1406                             compat.partial(done.put, "Prio%s" % prio)))
1407       prev = ev
1408
1409     # Start acquires
1410     first.set()
1411
1412     # Wait for last acquire to start
1413     prev.wait()
1414
1415     # Let threads acquire locks
1416     self.ls.release()
1417
1418     # Wait for threads to finish
1419     self._waitThreads()
1420
1421     for i in range(1, 33):
1422       self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1423       self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1424
1425     self.assertRaises(Queue.Empty, self.done.get_nowait)
1426     self.assertRaises(Queue.Empty, done_two.get_nowait)
1427
1428
1429 class TestGanetiLockManager(_ThreadedTestCase):
1430
1431   def setUp(self):
1432     _ThreadedTestCase.setUp(self)
1433     self.nodes=['n1', 'n2']
1434     self.nodegroups=['g1', 'g2']
1435     self.instances=['i1', 'i2', 'i3']
1436     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1437                                         self.instances)
1438
1439   def tearDown(self):
1440     # Don't try this at home...
1441     locking.GanetiLockManager._instance = None
1442
1443   def testLockingConstants(self):
1444     # The locking library internally cheats by assuming its constants have some
1445     # relationships with each other. Check those hold true.
1446     # This relationship is also used in the Processor to recursively acquire
1447     # the right locks. Again, please don't break it.
1448     for i in range(len(locking.LEVELS)):
1449       self.assertEqual(i, locking.LEVELS[i])
1450
1451   def testDoubleGLFails(self):
1452     self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1453
1454   def testLockNames(self):
1455     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1456     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1457     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1458                      set(self.nodegroups))
1459     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1460                      set(self.instances))
1461
1462   def testInitAndResources(self):
1463     locking.GanetiLockManager._instance = None
1464     self.GL = locking.GanetiLockManager([], [], [])
1465     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1466     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1467     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1468     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1469
1470     locking.GanetiLockManager._instance = None
1471     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1472     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1473     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1474     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1475                                     set(self.nodegroups))
1476     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1477
1478     locking.GanetiLockManager._instance = None
1479     self.GL = locking.GanetiLockManager([], [], self.instances)
1480     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1481     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1482     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1483     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1484                      set(self.instances))
1485
1486   def testAcquireRelease(self):
1487     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1488     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1489     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1490     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1491     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1492     self.GL.release(locking.LEVEL_NODE, ['n2'])
1493     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1494     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1495     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1496     self.GL.release(locking.LEVEL_NODE)
1497     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1498     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1499     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1500     self.GL.release(locking.LEVEL_NODEGROUP)
1501     self.GL.release(locking.LEVEL_INSTANCE)
1502     self.assertRaises(errors.LockError, self.GL.acquire,
1503                       locking.LEVEL_INSTANCE, ['i5'])
1504     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1505     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1506
1507   def testAcquireWholeSets(self):
1508     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1509     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1510                       set(self.instances))
1511     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1512                       set(self.instances))
1513     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1514                       set(self.nodegroups))
1515     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1516                       set(self.nodegroups))
1517     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1518                       set(self.nodes))
1519     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1520                       set(self.nodes))
1521     self.GL.release(locking.LEVEL_NODE)
1522     self.GL.release(locking.LEVEL_NODEGROUP)
1523     self.GL.release(locking.LEVEL_INSTANCE)
1524     self.GL.release(locking.LEVEL_CLUSTER)
1525
1526   def testAcquireWholeAndPartial(self):
1527     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1528     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1529                       set(self.instances))
1530     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1531                       set(self.instances))
1532     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1533                       set(['n2']))
1534     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1535                       set(['n2']))
1536     self.GL.release(locking.LEVEL_NODE)
1537     self.GL.release(locking.LEVEL_INSTANCE)
1538     self.GL.release(locking.LEVEL_CLUSTER)
1539
1540   def testBGLDependency(self):
1541     self.assertRaises(AssertionError, self.GL.acquire,
1542                       locking.LEVEL_NODE, ['n1', 'n2'])
1543     self.assertRaises(AssertionError, self.GL.acquire,
1544                       locking.LEVEL_INSTANCE, ['i3'])
1545     self.assertRaises(AssertionError, self.GL.acquire,
1546                       locking.LEVEL_NODEGROUP, ['g1'])
1547     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1548     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1549     self.assertRaises(AssertionError, self.GL.release,
1550                       locking.LEVEL_CLUSTER, ['BGL'])
1551     self.assertRaises(AssertionError, self.GL.release,
1552                       locking.LEVEL_CLUSTER)
1553     self.GL.release(locking.LEVEL_NODE)
1554     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1555     self.assertRaises(AssertionError, self.GL.release,
1556                       locking.LEVEL_CLUSTER, ['BGL'])
1557     self.assertRaises(AssertionError, self.GL.release,
1558                       locking.LEVEL_CLUSTER)
1559     self.GL.release(locking.LEVEL_INSTANCE)
1560     self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1561     self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1562     self.assertRaises(AssertionError, self.GL.release,
1563                       locking.LEVEL_CLUSTER, ['BGL'])
1564     self.assertRaises(AssertionError, self.GL.release,
1565                       locking.LEVEL_CLUSTER)
1566     self.GL.release(locking.LEVEL_NODEGROUP)
1567     self.GL.release(locking.LEVEL_CLUSTER)
1568
1569   def testWrongOrder(self):
1570     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1571     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1572     self.assertRaises(AssertionError, self.GL.acquire,
1573                       locking.LEVEL_NODE, ['n1'])
1574     self.assertRaises(AssertionError, self.GL.acquire,
1575                       locking.LEVEL_NODEGROUP, ['g1'])
1576     self.assertRaises(AssertionError, self.GL.acquire,
1577                       locking.LEVEL_INSTANCE, ['i2'])
1578
1579   def testModifiableLevels(self):
1580     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1581                       ['BGL2'])
1582     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1583     self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1584     self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1585     self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1586     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1587     self.GL.add(locking.LEVEL_NODE, ['n3'])
1588     self.GL.remove(locking.LEVEL_NODE, ['n1'])
1589     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1590     self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1591     self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1592     self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1593     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1594     self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1595                       ['BGL2'])
1596
1597   # Helper function to run as a thread that shared the BGL and then acquires
1598   # some locks at another level.
1599   def _doLock(self, level, names, shared):
1600     try:
1601       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1602       self.GL.acquire(level, names, shared=shared)
1603       self.done.put('DONE')
1604       self.GL.release(level)
1605       self.GL.release(locking.LEVEL_CLUSTER)
1606     except errors.LockError:
1607       self.done.put('ERR')
1608
1609   @_Repeat
1610   def testConcurrency(self):
1611     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1612     self._addThread(target=self._doLock,
1613                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1614     self._waitThreads()
1615     self.assertEqual(self.done.get_nowait(), 'DONE')
1616     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1617     self._addThread(target=self._doLock,
1618                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1619     self._waitThreads()
1620     self.assertEqual(self.done.get_nowait(), 'DONE')
1621     self._addThread(target=self._doLock,
1622                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1623     self.assertRaises(Queue.Empty, self.done.get_nowait)
1624     self.GL.release(locking.LEVEL_INSTANCE)
1625     self._waitThreads()
1626     self.assertEqual(self.done.get_nowait(), 'DONE')
1627     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1628     self._addThread(target=self._doLock,
1629                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1630     self._waitThreads()
1631     self.assertEqual(self.done.get_nowait(), 'DONE')
1632     self._addThread(target=self._doLock,
1633                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1634     self.assertRaises(Queue.Empty, self.done.get_nowait)
1635     self.GL.release(locking.LEVEL_INSTANCE)
1636     self._waitThreads()
1637     self.assertEqual(self.done.get(True, 1), 'DONE')
1638     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1639
1640
1641 class TestLockMonitor(_ThreadedTestCase):
1642   def setUp(self):
1643     _ThreadedTestCase.setUp(self)
1644     self.lm = locking.LockMonitor()
1645
1646   def testSingleThread(self):
1647     locks = []
1648
1649     for i in range(100):
1650       name = "TestLock%s" % i
1651       locks.append(locking.SharedLock(name, monitor=self.lm))
1652
1653     self.assertEqual(len(self.lm._locks), len(locks))
1654     result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1655     self.assertEqual(len(result.fields), 1)
1656     self.assertEqual(len(result.data), 100)
1657
1658     # Delete all locks
1659     del locks[:]
1660
1661     # The garbage collector might needs some time
1662     def _CheckLocks():
1663       if self.lm._locks:
1664         raise utils.RetryAgain()
1665
1666     utils.Retry(_CheckLocks, 0.1, 30.0)
1667
1668     self.assertFalse(self.lm._locks)
1669
1670   def testMultiThread(self):
1671     locks = []
1672
1673     def _CreateLock(prev, next, name):
1674       prev.wait()
1675       locks.append(locking.SharedLock(name, monitor=self.lm))
1676       if next:
1677         next.set()
1678
1679     expnames = []
1680
1681     first = threading.Event()
1682     prev = first
1683
1684     # Use a deterministic random generator
1685     for i in random.Random(4263).sample(range(100), 33):
1686       name = "MtTestLock%s" % i
1687       expnames.append(name)
1688
1689       ev = threading.Event()
1690       self._addThread(target=_CreateLock, args=(prev, ev, name))
1691       prev = ev
1692
1693     # Add locks
1694     first.set()
1695     self._waitThreads()
1696
1697     # Check order in which locks were added
1698     self.assertEqual([i.name for i in locks], expnames)
1699
1700     # Check query result
1701     result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1702     self.assert_(isinstance(result, dict))
1703     response = objects.QueryResponse.FromDict(result)
1704     self.assertEqual(response.data,
1705                      [[(constants.RS_NORMAL, name),
1706                        (constants.RS_NORMAL, None),
1707                        (constants.RS_NORMAL, None),
1708                        (constants.RS_NORMAL, [])]
1709                       for name in utils.NiceSort(expnames)])
1710     self.assertEqual(len(response.fields), 4)
1711     self.assertEqual(["name", "mode", "owner", "pending"],
1712                      [fdef.name for fdef in response.fields])
1713
1714     # Test exclusive acquire
1715     for tlock in locks[::4]:
1716       tlock.acquire(shared=0)
1717       try:
1718         def _GetExpResult(name):
1719           if tlock.name == name:
1720             return [(constants.RS_NORMAL, name),
1721                     (constants.RS_NORMAL, "exclusive"),
1722                     (constants.RS_NORMAL,
1723                      [threading.currentThread().getName()]),
1724                     (constants.RS_NORMAL, [])]
1725           return [(constants.RS_NORMAL, name),
1726                   (constants.RS_NORMAL, None),
1727                   (constants.RS_NORMAL, None),
1728                   (constants.RS_NORMAL, [])]
1729
1730         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1731         self.assertEqual(objects.QueryResponse.FromDict(result).data,
1732                          [_GetExpResult(name)
1733                           for name in utils.NiceSort(expnames)])
1734       finally:
1735         tlock.release()
1736
1737     # Test shared acquire
1738     def _Acquire(lock, shared, ev, notify):
1739       lock.acquire(shared=shared)
1740       try:
1741         notify.set()
1742         ev.wait()
1743       finally:
1744         lock.release()
1745
1746     for tlock1 in locks[::11]:
1747       for tlock2 in locks[::-15]:
1748         if tlock2 == tlock1:
1749           # Avoid deadlocks
1750           continue
1751
1752         for tlock3 in locks[::10]:
1753           if tlock3 in (tlock2, tlock1):
1754             # Avoid deadlocks
1755             continue
1756
1757           releaseev = threading.Event()
1758
1759           # Acquire locks
1760           acquireev = []
1761           tthreads1 = []
1762           for i in range(3):
1763             ev = threading.Event()
1764             tthreads1.append(self._addThread(target=_Acquire,
1765                                              args=(tlock1, 1, releaseev, ev)))
1766             acquireev.append(ev)
1767
1768           ev = threading.Event()
1769           tthread2 = self._addThread(target=_Acquire,
1770                                      args=(tlock2, 1, releaseev, ev))
1771           acquireev.append(ev)
1772
1773           ev = threading.Event()
1774           tthread3 = self._addThread(target=_Acquire,
1775                                      args=(tlock3, 0, releaseev, ev))
1776           acquireev.append(ev)
1777
1778           # Wait for all locks to be acquired
1779           for i in acquireev:
1780             i.wait()
1781
1782           # Check query result
1783           result = self.lm.QueryLocks(["name", "mode", "owner"])
1784           response = objects.QueryResponse.FromDict(result)
1785           for (name, mode, owner) in response.data:
1786             (name_status, name_value) = name
1787             (owner_status, owner_value) = owner
1788
1789             self.assertEqual(name_status, constants.RS_NORMAL)
1790             self.assertEqual(owner_status, constants.RS_NORMAL)
1791
1792             if name_value == tlock1.name:
1793               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1794               self.assertEqual(set(owner_value),
1795                                set(i.getName() for i in tthreads1))
1796               continue
1797
1798             if name_value == tlock2.name:
1799               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1800               self.assertEqual(owner_value, [tthread2.getName()])
1801               continue
1802
1803             if name_value == tlock3.name:
1804               self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1805               self.assertEqual(owner_value, [tthread3.getName()])
1806               continue
1807
1808             self.assert_(name_value in expnames)
1809             self.assertEqual(mode, (constants.RS_NORMAL, None))
1810             self.assert_(owner_value is None)
1811
1812           # Release locks again
1813           releaseev.set()
1814
1815           self._waitThreads()
1816
1817           result = self.lm.QueryLocks(["name", "mode", "owner"])
1818           self.assertEqual(objects.QueryResponse.FromDict(result).data,
1819                            [[(constants.RS_NORMAL, name),
1820                              (constants.RS_NORMAL, None),
1821                              (constants.RS_NORMAL, None)]
1822                             for name in utils.NiceSort(expnames)])
1823
1824   def testDelete(self):
1825     lock = locking.SharedLock("TestLock", monitor=self.lm)
1826
1827     self.assertEqual(len(self.lm._locks), 1)
1828     result = self.lm.QueryLocks(["name", "mode", "owner"])
1829     self.assertEqual(objects.QueryResponse.FromDict(result).data,
1830                      [[(constants.RS_NORMAL, lock.name),
1831                        (constants.RS_NORMAL, None),
1832                        (constants.RS_NORMAL, None)]])
1833
1834     lock.delete()
1835
1836     result = self.lm.QueryLocks(["name", "mode", "owner"])
1837     self.assertEqual(objects.QueryResponse.FromDict(result).data,
1838                      [[(constants.RS_NORMAL, lock.name),
1839                        (constants.RS_NORMAL, "deleted"),
1840                        (constants.RS_NORMAL, None)]])
1841     self.assertEqual(len(self.lm._locks), 1)
1842
1843   def testPending(self):
1844     def _Acquire(lock, shared, prev, next):
1845       prev.wait()
1846
1847       lock.acquire(shared=shared, test_notify=next.set)
1848       try:
1849         pass
1850       finally:
1851         lock.release()
1852
1853     lock = locking.SharedLock("ExcLock", monitor=self.lm)
1854
1855     for shared in [0, 1]:
1856       lock.acquire()
1857       try:
1858         self.assertEqual(len(self.lm._locks), 1)
1859         result = self.lm.QueryLocks(["name", "mode", "owner"])
1860         self.assertEqual(objects.QueryResponse.FromDict(result).data,
1861                          [[(constants.RS_NORMAL, lock.name),
1862                            (constants.RS_NORMAL, "exclusive"),
1863                            (constants.RS_NORMAL,
1864                             [threading.currentThread().getName()])]])
1865
1866         threads = []
1867
1868         first = threading.Event()
1869         prev = first
1870
1871         for i in range(5):
1872           ev = threading.Event()
1873           threads.append(self._addThread(target=_Acquire,
1874                                           args=(lock, shared, prev, ev)))
1875           prev = ev
1876
1877         # Start acquires
1878         first.set()
1879
1880         # Wait for last acquire to start waiting
1881         prev.wait()
1882
1883         # NOTE: This works only because QueryLocks will acquire the
1884         # lock-internal lock again and won't be able to get the information
1885         # until it has the lock. By then the acquire should be registered in
1886         # SharedLock.__pending (otherwise it's a bug).
1887
1888         # All acquires are waiting now
1889         if shared:
1890           pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
1891         else:
1892           pending = [("exclusive", [t.getName()]) for t in threads]
1893
1894         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1895         self.assertEqual(objects.QueryResponse.FromDict(result).data,
1896                          [[(constants.RS_NORMAL, lock.name),
1897                            (constants.RS_NORMAL, "exclusive"),
1898                            (constants.RS_NORMAL,
1899                             [threading.currentThread().getName()]),
1900                            (constants.RS_NORMAL, pending)]])
1901
1902         self.assertEqual(len(self.lm._locks), 1)
1903       finally:
1904         lock.release()
1905
1906       self._waitThreads()
1907
1908       # No pending acquires
1909       result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1910       self.assertEqual(objects.QueryResponse.FromDict(result).data,
1911                        [[(constants.RS_NORMAL, lock.name),
1912                          (constants.RS_NORMAL, None),
1913                          (constants.RS_NORMAL, None),
1914                          (constants.RS_NORMAL, [])]])
1915
1916       self.assertEqual(len(self.lm._locks), 1)
1917
1918   def testDeleteAndRecreate(self):
1919     lname = "TestLock101923193"
1920
1921     # Create some locks with the same name and keep all references
1922     locks = [locking.SharedLock(lname, monitor=self.lm)
1923              for _ in range(5)]
1924
1925     self.assertEqual(len(self.lm._locks), len(locks))
1926
1927     result = self.lm.QueryLocks(["name", "mode", "owner"])
1928     self.assertEqual(objects.QueryResponse.FromDict(result).data,
1929                      [[(constants.RS_NORMAL, lname),
1930                        (constants.RS_NORMAL, None),
1931                        (constants.RS_NORMAL, None)]] * 5)
1932
1933     locks[2].delete()
1934
1935     # Check information order
1936     result = self.lm.QueryLocks(["name", "mode", "owner"])
1937     self.assertEqual(objects.QueryResponse.FromDict(result).data,
1938                      [[(constants.RS_NORMAL, lname),
1939                        (constants.RS_NORMAL, None),
1940                        (constants.RS_NORMAL, None)]] * 2 +
1941                      [[(constants.RS_NORMAL, lname),
1942                        (constants.RS_NORMAL, "deleted"),
1943                        (constants.RS_NORMAL, None)]] +
1944                      [[(constants.RS_NORMAL, lname),
1945                        (constants.RS_NORMAL, None),
1946                        (constants.RS_NORMAL, None)]] * 2)
1947
1948     locks[1].acquire(shared=0)
1949
1950     last_status = [
1951       [(constants.RS_NORMAL, lname),
1952        (constants.RS_NORMAL, None),
1953        (constants.RS_NORMAL, None)],
1954       [(constants.RS_NORMAL, lname),
1955        (constants.RS_NORMAL, "exclusive"),
1956        (constants.RS_NORMAL, [threading.currentThread().getName()])],
1957       [(constants.RS_NORMAL, lname),
1958        (constants.RS_NORMAL, "deleted"),
1959        (constants.RS_NORMAL, None)],
1960       [(constants.RS_NORMAL, lname),
1961        (constants.RS_NORMAL, None),
1962        (constants.RS_NORMAL, None)],
1963       [(constants.RS_NORMAL, lname),
1964        (constants.RS_NORMAL, None),
1965        (constants.RS_NORMAL, None)],
1966       ]
1967
1968     # Check information order
1969     result = self.lm.QueryLocks(["name", "mode", "owner"])
1970     self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
1971
1972     self.assertEqual(len(set(self.lm._locks.values())), len(locks))
1973     self.assertEqual(len(self.lm._locks), len(locks))
1974
1975     # Check lock deletion
1976     for idx in range(len(locks)):
1977       del locks[0]
1978       assert gc.isenabled()
1979       gc.collect()
1980       self.assertEqual(len(self.lm._locks), len(locks))
1981       result = self.lm.QueryLocks(["name", "mode", "owner"])
1982       self.assertEqual(objects.QueryResponse.FromDict(result).data,
1983                        last_status[idx + 1:])
1984
1985     # All locks should have been deleted
1986     assert not locks
1987     self.assertFalse(self.lm._locks)
1988
1989     result = self.lm.QueryLocks(["name", "mode", "owner"])
1990     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
1991
1992
1993 if __name__ == '__main__':
1994   testutils.GanetiTestProgram()