--select-instances hbal manpage update
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31 import gc
32 import itertools
33
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
41
42 import testutils
43
44
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
48
49 #: List for looping tests
50 ITERATIONS = range(8)
51
52
53 def _Repeat(fn):
54   """Decorator for executing a function many times"""
55   def wrapper(*args, **kwargs):
56     for i in ITERATIONS:
57       fn(*args, **kwargs)
58   return wrapper
59
60
61 def SafeSleep(duration):
62   start = time.time()
63   while True:
64     delay = start + duration - time.time()
65     if delay <= 0.0:
66       break
67     time.sleep(delay)
68
69
70 class _ThreadedTestCase(unittest.TestCase):
71   """Test class that supports adding/waiting on threads"""
72   def setUp(self):
73     unittest.TestCase.setUp(self)
74     self.done = Queue.Queue(0)
75     self.threads = []
76
77   def _addThread(self, *args, **kwargs):
78     """Create and remember a new thread"""
79     t = threading.Thread(*args, **kwargs)
80     self.threads.append(t)
81     t.start()
82     return t
83
84   def _waitThreads(self):
85     """Wait for all our threads to finish"""
86     for t in self.threads:
87       t.join(60)
88       self.failIf(t.isAlive())
89     self.threads = []
90
91
92 class _ConditionTestCase(_ThreadedTestCase):
93   """Common test case for conditions"""
94
95   def setUp(self, cls):
96     _ThreadedTestCase.setUp(self)
97     self.lock = threading.Lock()
98     self.cond = cls(self.lock)
99
100   def _testAcquireRelease(self):
101     self.assertFalse(self.cond._is_owned())
102     self.assertRaises(RuntimeError, self.cond.wait, None)
103     self.assertRaises(RuntimeError, self.cond.notifyAll)
104
105     self.cond.acquire()
106     self.assert_(self.cond._is_owned())
107     self.cond.notifyAll()
108     self.assert_(self.cond._is_owned())
109     self.cond.release()
110
111     self.assertFalse(self.cond._is_owned())
112     self.assertRaises(RuntimeError, self.cond.wait, None)
113     self.assertRaises(RuntimeError, self.cond.notifyAll)
114
115   def _testNotification(self):
116     def _NotifyAll():
117       self.done.put("NE")
118       self.cond.acquire()
119       self.done.put("NA")
120       self.cond.notifyAll()
121       self.done.put("NN")
122       self.cond.release()
123
124     self.cond.acquire()
125     self._addThread(target=_NotifyAll)
126     self.assertEqual(self.done.get(True, 1), "NE")
127     self.assertRaises(Queue.Empty, self.done.get_nowait)
128     self.cond.wait(None)
129     self.assertEqual(self.done.get(True, 1), "NA")
130     self.assertEqual(self.done.get(True, 1), "NN")
131     self.assert_(self.cond._is_owned())
132     self.cond.release()
133     self.assertFalse(self.cond._is_owned())
134
135
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137   """SingleNotifyPipeCondition tests"""
138
139   def setUp(self):
140     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141
142   def testAcquireRelease(self):
143     self._testAcquireRelease()
144
145   def testNotification(self):
146     self._testNotification()
147
148   def testWaitReuse(self):
149     self.cond.acquire()
150     self.cond.wait(0)
151     self.cond.wait(0.1)
152     self.cond.release()
153
154   def testNoNotifyReuse(self):
155     self.cond.acquire()
156     self.cond.notifyAll()
157     self.assertRaises(RuntimeError, self.cond.wait, None)
158     self.assertRaises(RuntimeError, self.cond.notifyAll)
159     self.cond.release()
160
161
162 class TestPipeCondition(_ConditionTestCase):
163   """PipeCondition tests"""
164
165   def setUp(self):
166     _ConditionTestCase.setUp(self, locking.PipeCondition)
167
168   def testAcquireRelease(self):
169     self._testAcquireRelease()
170
171   def testNotification(self):
172     self._testNotification()
173
174   def _TestWait(self, fn):
175     threads = [
176       self._addThread(target=fn),
177       self._addThread(target=fn),
178       self._addThread(target=fn),
179       ]
180
181     # Wait for threads to be waiting
182     for _ in threads:
183       self.assertEqual(self.done.get(True, 1), "A")
184
185     self.assertRaises(Queue.Empty, self.done.get_nowait)
186
187     self.cond.acquire()
188     self.assertEqual(len(self.cond._waiters), 3)
189     self.assertEqual(self.cond._waiters, set(threads))
190     # This new thread can't acquire the lock, and thus call wait, before we
191     # release it
192     self._addThread(target=fn)
193     self.cond.notifyAll()
194     self.assertRaises(Queue.Empty, self.done.get_nowait)
195     self.cond.release()
196
197     # We should now get 3 W and 1 A (for the new thread) in whatever order
198     w = 0
199     a = 0
200     for i in range(4):
201       got = self.done.get(True, 1)
202       if got == "W":
203         w += 1
204       elif got == "A":
205         a += 1
206       else:
207         self.fail("Got %s on the done queue" % got)
208
209     self.assertEqual(w, 3)
210     self.assertEqual(a, 1)
211
212     self.cond.acquire()
213     self.cond.notifyAll()
214     self.cond.release()
215     self._waitThreads()
216     self.assertEqual(self.done.get_nowait(), "W")
217     self.assertRaises(Queue.Empty, self.done.get_nowait)
218
219   def testBlockingWait(self):
220     def _BlockingWait():
221       self.cond.acquire()
222       self.done.put("A")
223       self.cond.wait(None)
224       self.cond.release()
225       self.done.put("W")
226
227     self._TestWait(_BlockingWait)
228
229   def testLongTimeoutWait(self):
230     def _Helper():
231       self.cond.acquire()
232       self.done.put("A")
233       self.cond.wait(15.0)
234       self.cond.release()
235       self.done.put("W")
236
237     self._TestWait(_Helper)
238
239   def _TimeoutWait(self, timeout, check):
240     self.cond.acquire()
241     self.cond.wait(timeout)
242     self.cond.release()
243     self.done.put(check)
244
245   def testShortTimeoutWait(self):
246     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248     self._waitThreads()
249     self.assertEqual(self.done.get_nowait(), "T1")
250     self.assertEqual(self.done.get_nowait(), "T1")
251     self.assertRaises(Queue.Empty, self.done.get_nowait)
252
253   def testZeroTimeoutWait(self):
254     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257     self._waitThreads()
258     self.assertEqual(self.done.get_nowait(), "T0")
259     self.assertEqual(self.done.get_nowait(), "T0")
260     self.assertEqual(self.done.get_nowait(), "T0")
261     self.assertRaises(Queue.Empty, self.done.get_nowait)
262
263
264 class TestSharedLock(_ThreadedTestCase):
265   """SharedLock tests"""
266
267   def setUp(self):
268     _ThreadedTestCase.setUp(self)
269     self.sl = locking.SharedLock("TestSharedLock")
270
271   def testSequenceAndOwnership(self):
272     self.assertFalse(self.sl._is_owned())
273     self.sl.acquire(shared=1)
274     self.assert_(self.sl._is_owned())
275     self.assert_(self.sl._is_owned(shared=1))
276     self.assertFalse(self.sl._is_owned(shared=0))
277     self.sl.release()
278     self.assertFalse(self.sl._is_owned())
279     self.sl.acquire()
280     self.assert_(self.sl._is_owned())
281     self.assertFalse(self.sl._is_owned(shared=1))
282     self.assert_(self.sl._is_owned(shared=0))
283     self.sl.release()
284     self.assertFalse(self.sl._is_owned())
285     self.sl.acquire(shared=1)
286     self.assert_(self.sl._is_owned())
287     self.assert_(self.sl._is_owned(shared=1))
288     self.assertFalse(self.sl._is_owned(shared=0))
289     self.sl.release()
290     self.assertFalse(self.sl._is_owned())
291
292   def testBooleanValue(self):
293     # semaphores are supposed to return a true value on a successful acquire
294     self.assert_(self.sl.acquire(shared=1))
295     self.sl.release()
296     self.assert_(self.sl.acquire())
297     self.sl.release()
298
299   def testDoubleLockingStoE(self):
300     self.sl.acquire(shared=1)
301     self.assertRaises(AssertionError, self.sl.acquire)
302
303   def testDoubleLockingEtoS(self):
304     self.sl.acquire()
305     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306
307   def testDoubleLockingStoS(self):
308     self.sl.acquire(shared=1)
309     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310
311   def testDoubleLockingEtoE(self):
312     self.sl.acquire()
313     self.assertRaises(AssertionError, self.sl.acquire)
314
315   # helper functions: called in a separate thread they acquire the lock, send
316   # their identifier on the done queue, then release it.
317   def _doItSharer(self):
318     try:
319       self.sl.acquire(shared=1)
320       self.done.put('SHR')
321       self.sl.release()
322     except errors.LockError:
323       self.done.put('ERR')
324
325   def _doItExclusive(self):
326     try:
327       self.sl.acquire()
328       self.done.put('EXC')
329       self.sl.release()
330     except errors.LockError:
331       self.done.put('ERR')
332
333   def _doItDelete(self):
334     try:
335       self.sl.delete()
336       self.done.put('DEL')
337     except errors.LockError:
338       self.done.put('ERR')
339
340   def testSharersCanCoexist(self):
341     self.sl.acquire(shared=1)
342     threading.Thread(target=self._doItSharer).start()
343     self.assert_(self.done.get(True, 1))
344     self.sl.release()
345
346   @_Repeat
347   def testExclusiveBlocksExclusive(self):
348     self.sl.acquire()
349     self._addThread(target=self._doItExclusive)
350     self.assertRaises(Queue.Empty, self.done.get_nowait)
351     self.sl.release()
352     self._waitThreads()
353     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354
355   @_Repeat
356   def testExclusiveBlocksDelete(self):
357     self.sl.acquire()
358     self._addThread(target=self._doItDelete)
359     self.assertRaises(Queue.Empty, self.done.get_nowait)
360     self.sl.release()
361     self._waitThreads()
362     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363     self.sl = locking.SharedLock(self.sl.name)
364
365   @_Repeat
366   def testExclusiveBlocksSharer(self):
367     self.sl.acquire()
368     self._addThread(target=self._doItSharer)
369     self.assertRaises(Queue.Empty, self.done.get_nowait)
370     self.sl.release()
371     self._waitThreads()
372     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373
374   @_Repeat
375   def testSharerBlocksExclusive(self):
376     self.sl.acquire(shared=1)
377     self._addThread(target=self._doItExclusive)
378     self.assertRaises(Queue.Empty, self.done.get_nowait)
379     self.sl.release()
380     self._waitThreads()
381     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382
383   @_Repeat
384   def testSharerBlocksDelete(self):
385     self.sl.acquire(shared=1)
386     self._addThread(target=self._doItDelete)
387     self.assertRaises(Queue.Empty, self.done.get_nowait)
388     self.sl.release()
389     self._waitThreads()
390     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391     self.sl = locking.SharedLock(self.sl.name)
392
393   @_Repeat
394   def testWaitingExclusiveBlocksSharer(self):
395     """SKIPPED testWaitingExclusiveBlockSharer"""
396     return
397
398     self.sl.acquire(shared=1)
399     # the lock is acquired in shared mode...
400     self._addThread(target=self._doItExclusive)
401     # ...but now an exclusive is waiting...
402     self._addThread(target=self._doItSharer)
403     # ...so the sharer should be blocked as well
404     self.assertRaises(Queue.Empty, self.done.get_nowait)
405     self.sl.release()
406     self._waitThreads()
407     # The exclusive passed before
408     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410
411   @_Repeat
412   def testWaitingSharerBlocksExclusive(self):
413     """SKIPPED testWaitingSharerBlocksExclusive"""
414     return
415
416     self.sl.acquire()
417     # the lock is acquired in exclusive mode...
418     self._addThread(target=self._doItSharer)
419     # ...but now a sharer is waiting...
420     self._addThread(target=self._doItExclusive)
421     # ...the exclusive is waiting too...
422     self.assertRaises(Queue.Empty, self.done.get_nowait)
423     self.sl.release()
424     self._waitThreads()
425     # The sharer passed before
426     self.assertEqual(self.done.get_nowait(), 'SHR')
427     self.assertEqual(self.done.get_nowait(), 'EXC')
428
429   def testDelete(self):
430     self.sl.delete()
431     self.assertRaises(errors.LockError, self.sl.acquire)
432     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433     self.assertRaises(errors.LockError, self.sl.delete)
434
435   def testDeleteTimeout(self):
436     self.sl.delete(timeout=60)
437
438   def testNoDeleteIfSharer(self):
439     self.sl.acquire(shared=1)
440     self.assertRaises(AssertionError, self.sl.delete)
441
442   @_Repeat
443   def testDeletePendingSharersExclusiveDelete(self):
444     self.sl.acquire()
445     self._addThread(target=self._doItSharer)
446     self._addThread(target=self._doItSharer)
447     self._addThread(target=self._doItExclusive)
448     self._addThread(target=self._doItDelete)
449     self.sl.delete()
450     self._waitThreads()
451     # The threads who were pending return ERR
452     for _ in range(4):
453       self.assertEqual(self.done.get_nowait(), 'ERR')
454     self.sl = locking.SharedLock(self.sl.name)
455
456   @_Repeat
457   def testDeletePendingDeleteExclusiveSharers(self):
458     self.sl.acquire()
459     self._addThread(target=self._doItDelete)
460     self._addThread(target=self._doItExclusive)
461     self._addThread(target=self._doItSharer)
462     self._addThread(target=self._doItSharer)
463     self.sl.delete()
464     self._waitThreads()
465     # The two threads who were pending return both ERR
466     self.assertEqual(self.done.get_nowait(), 'ERR')
467     self.assertEqual(self.done.get_nowait(), 'ERR')
468     self.assertEqual(self.done.get_nowait(), 'ERR')
469     self.assertEqual(self.done.get_nowait(), 'ERR')
470     self.sl = locking.SharedLock(self.sl.name)
471
472   @_Repeat
473   def testExclusiveAcquireTimeout(self):
474     for shared in [0, 1]:
475       on_queue = threading.Event()
476       release_exclusive = threading.Event()
477
478       def _LockExclusive():
479         self.sl.acquire(shared=0, test_notify=on_queue.set)
480         self.done.put("A: start wait")
481         release_exclusive.wait()
482         self.done.put("A: end wait")
483         self.sl.release()
484
485       # Start thread to hold lock in exclusive mode
486       self._addThread(target=_LockExclusive)
487
488       # Wait for wait to begin
489       self.assertEqual(self.done.get(timeout=60), "A: start wait")
490
491       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
492       # on the queue
493       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494                                       test_notify=release_exclusive.set))
495
496       self.done.put("got 2nd")
497       self.sl.release()
498
499       self._waitThreads()
500
501       self.assertEqual(self.done.get_nowait(), "A: end wait")
502       self.assertEqual(self.done.get_nowait(), "got 2nd")
503       self.assertRaises(Queue.Empty, self.done.get_nowait)
504
505   @_Repeat
506   def testAcquireExpiringTimeout(self):
507     def _AcquireWithTimeout(shared, timeout):
508       if not self.sl.acquire(shared=shared, timeout=timeout):
509         self.done.put("timeout")
510
511     for shared in [0, 1]:
512       # Lock exclusively
513       self.sl.acquire()
514
515       # Start shared acquires with timeout between 0 and 20 ms
516       for i in range(11):
517         self._addThread(target=_AcquireWithTimeout,
518                         args=(shared, i * 2.0 / 1000.0))
519
520       # Wait for threads to finish (makes sure the acquire timeout expires
521       # before releasing the lock)
522       self._waitThreads()
523
524       # Release lock
525       self.sl.release()
526
527       for _ in range(11):
528         self.assertEqual(self.done.get_nowait(), "timeout")
529
530       self.assertRaises(Queue.Empty, self.done.get_nowait)
531
532   @_Repeat
533   def testSharedSkipExclusiveAcquires(self):
534     # Tests whether shared acquires jump in front of exclusive acquires in the
535     # queue.
536
537     def _Acquire(shared, name, notify_ev, wait_ev):
538       if notify_ev:
539         notify_fn = notify_ev.set
540       else:
541         notify_fn = None
542
543       if wait_ev:
544         wait_ev.wait()
545
546       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
547         return
548
549       self.done.put(name)
550       self.sl.release()
551
552     # Get exclusive lock while we fill the queue
553     self.sl.acquire()
554
555     shrcnt1 = 5
556     shrcnt2 = 7
557     shrcnt3 = 9
558     shrcnt4 = 2
559
560     # Add acquires using threading.Event for synchronization. They'll be
561     # acquired exactly in the order defined in this list.
562     acquires = (shrcnt1 * [(1, "shared 1")] +
563                 3 * [(0, "exclusive 1")] +
564                 shrcnt2 * [(1, "shared 2")] +
565                 shrcnt3 * [(1, "shared 3")] +
566                 shrcnt4 * [(1, "shared 4")] +
567                 3 * [(0, "exclusive 2")])
568
569     ev_cur = None
570     ev_prev = None
571
572     for args in acquires:
573       ev_cur = threading.Event()
574       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
575       ev_prev = ev_cur
576
577     # Wait for last acquire to start
578     ev_prev.wait()
579
580     # Expect 6 pending exclusive acquires and 1 for all shared acquires
581     # together
582     self.assertEqual(self.sl._count_pending(), 7)
583
584     # Release exclusive lock and wait
585     self.sl.release()
586
587     self._waitThreads()
588
589     # Check sequence
590     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591       # Shared locks aren't guaranteed to be notified in order, but they'll be
592       # first
593       tmp = self.done.get_nowait()
594       if tmp == "shared 1":
595         shrcnt1 -= 1
596       elif tmp == "shared 2":
597         shrcnt2 -= 1
598       elif tmp == "shared 3":
599         shrcnt3 -= 1
600       elif tmp == "shared 4":
601         shrcnt4 -= 1
602     self.assertEqual(shrcnt1, 0)
603     self.assertEqual(shrcnt2, 0)
604     self.assertEqual(shrcnt3, 0)
605     self.assertEqual(shrcnt3, 0)
606
607     for _ in range(3):
608       self.assertEqual(self.done.get_nowait(), "exclusive 1")
609
610     for _ in range(3):
611       self.assertEqual(self.done.get_nowait(), "exclusive 2")
612
613     self.assertRaises(Queue.Empty, self.done.get_nowait)
614
615   def testIllegalDowngrade(self):
616     # Not yet acquired
617     self.assertRaises(AssertionError, self.sl.downgrade)
618
619     # Acquire in shared mode, downgrade should be no-op
620     self.assertTrue(self.sl.acquire(shared=1))
621     self.assertTrue(self.sl._is_owned(shared=1))
622     self.assertTrue(self.sl.downgrade())
623     self.assertTrue(self.sl._is_owned(shared=1))
624     self.sl.release()
625
626   def testDowngrade(self):
627     self.assertTrue(self.sl.acquire())
628     self.assertTrue(self.sl._is_owned(shared=0))
629     self.assertTrue(self.sl.downgrade())
630     self.assertTrue(self.sl._is_owned(shared=1))
631     self.sl.release()
632
633   @_Repeat
634   def testDowngradeJumpsAheadOfExclusive(self):
635     def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636       self.assertTrue(self.sl.acquire())
637       self.assertTrue(self.sl._is_owned(shared=0))
638       ev_got.set()
639       ev_downgrade.wait()
640       self.assertTrue(self.sl._is_owned(shared=0))
641       self.assertTrue(self.sl.downgrade())
642       self.assertTrue(self.sl._is_owned(shared=1))
643       ev_release.wait()
644       self.assertTrue(self.sl._is_owned(shared=1))
645       self.sl.release()
646
647     def _KeepExclusive2(ev_started, ev_release):
648       self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649       self.assertTrue(self.sl._is_owned(shared=0))
650       ev_release.wait()
651       self.assertTrue(self.sl._is_owned(shared=0))
652       self.sl.release()
653
654     def _KeepShared(ev_started, ev_got, ev_release):
655       self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656       self.assertTrue(self.sl._is_owned(shared=1))
657       ev_got.set()
658       ev_release.wait()
659       self.assertTrue(self.sl._is_owned(shared=1))
660       self.sl.release()
661
662     # Acquire lock in exclusive mode
663     ev_got_excl1 = threading.Event()
664     ev_downgrade_excl1 = threading.Event()
665     ev_release_excl1 = threading.Event()
666     th_excl1 = self._addThread(target=_KeepExclusive,
667                                args=(ev_got_excl1, ev_downgrade_excl1,
668                                      ev_release_excl1))
669     ev_got_excl1.wait()
670
671     # Start a second exclusive acquire
672     ev_started_excl2 = threading.Event()
673     ev_release_excl2 = threading.Event()
674     th_excl2 = self._addThread(target=_KeepExclusive2,
675                                args=(ev_started_excl2, ev_release_excl2))
676     ev_started_excl2.wait()
677
678     # Start shared acquires, will jump ahead of second exclusive acquire when
679     # first exclusive acquire downgrades
680     ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681     ev_release_shared = threading.Event()
682
683     th_shared = [self._addThread(target=_KeepShared,
684                                  args=(ev_started, ev_got, ev_release_shared))
685                  for (ev_started, ev_got) in ev_shared]
686
687     # Wait for all shared acquires to start
688     for (ev, _) in ev_shared:
689       ev.wait()
690
691     # Check lock information
692     self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693                      (self.sl.name, "exclusive", [th_excl1.getName()], None))
694     (_, _, _, pending) = self.sl.GetInfo(set([query.LQ_PENDING]))
695     self.assertEqual([(pendmode, sorted(waiting))
696                       for (pendmode, waiting) in pending],
697                      [("exclusive", [th_excl2.getName()]),
698                       ("shared", sorted(th.getName() for th in th_shared))])
699
700     # Shared acquires won't start until the exclusive lock is downgraded
701     ev_downgrade_excl1.set()
702
703     # Wait for all shared acquires to be successful
704     for (_, ev) in ev_shared:
705       ev.wait()
706
707     # Check lock information again
708     self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_PENDING])),
709                      (self.sl.name, "shared", None,
710                       [("exclusive", [th_excl2.getName()])]))
711     (_, _, owner, _) = self.sl.GetInfo(set([query.LQ_OWNER]))
712     self.assertEqual(set(owner), set([th_excl1.getName()] +
713                                      [th.getName() for th in th_shared]))
714
715     ev_release_excl1.set()
716     ev_release_excl2.set()
717     ev_release_shared.set()
718
719     self._waitThreads()
720
721     self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER,
722                                           query.LQ_PENDING])),
723                      (self.sl.name, None, None, []))
724
725   @_Repeat
726   def testMixedAcquireTimeout(self):
727     sync = threading.Event()
728
729     def _AcquireShared(ev):
730       if not self.sl.acquire(shared=1, timeout=None):
731         return
732
733       self.done.put("shared")
734
735       # Notify main thread
736       ev.set()
737
738       # Wait for notification from main thread
739       sync.wait()
740
741       # Release lock
742       self.sl.release()
743
744     acquires = []
745     for _ in range(3):
746       ev = threading.Event()
747       self._addThread(target=_AcquireShared, args=(ev, ))
748       acquires.append(ev)
749
750     # Wait for all acquires to finish
751     for i in acquires:
752       i.wait()
753
754     self.assertEqual(self.sl._count_pending(), 0)
755
756     # Try to get exclusive lock
757     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
758
759     # Acquire exclusive without timeout
760     exclsync = threading.Event()
761     exclev = threading.Event()
762
763     def _AcquireExclusive():
764       if not self.sl.acquire(shared=0):
765         return
766
767       self.done.put("exclusive")
768
769       # Notify main thread
770       exclev.set()
771
772       # Wait for notification from main thread
773       exclsync.wait()
774
775       self.sl.release()
776
777     self._addThread(target=_AcquireExclusive)
778
779     # Try to get exclusive lock
780     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
781
782     # Make all shared holders release their locks
783     sync.set()
784
785     # Wait for exclusive acquire to succeed
786     exclev.wait()
787
788     self.assertEqual(self.sl._count_pending(), 0)
789
790     # Try to get exclusive lock
791     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
792
793     def _AcquireSharedSimple():
794       if self.sl.acquire(shared=1, timeout=None):
795         self.done.put("shared2")
796         self.sl.release()
797
798     for _ in range(10):
799       self._addThread(target=_AcquireSharedSimple)
800
801     # Tell exclusive lock to release
802     exclsync.set()
803
804     # Wait for everything to finish
805     self._waitThreads()
806
807     self.assertEqual(self.sl._count_pending(), 0)
808
809     # Check sequence
810     for _ in range(3):
811       self.assertEqual(self.done.get_nowait(), "shared")
812
813     self.assertEqual(self.done.get_nowait(), "exclusive")
814
815     for _ in range(10):
816       self.assertEqual(self.done.get_nowait(), "shared2")
817
818     self.assertRaises(Queue.Empty, self.done.get_nowait)
819
820   def testPriority(self):
821     # Acquire in exclusive mode
822     self.assert_(self.sl.acquire(shared=0))
823
824     # Queue acquires
825     def _Acquire(prev, next, shared, priority, result):
826       prev.wait()
827       self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
828       try:
829         self.done.put(result)
830       finally:
831         self.sl.release()
832
833     counter = itertools.count(0)
834     priorities = range(-20, 30)
835     first = threading.Event()
836     prev = first
837
838     # Data structure:
839     # {
840     #   priority:
841     #     [(shared/exclusive, set(acquire names), set(pending threads)),
842     #      (shared/exclusive, ...),
843     #      ...,
844     #     ],
845     # }
846     perprio = {}
847
848     # References shared acquire per priority in L{perprio}. Data structure:
849     # {
850     #   priority: (shared=1, set(acquire names), set(pending threads)),
851     # }
852     prioshared = {}
853
854     for seed in [4979, 9523, 14902, 32440]:
855       # Use a deterministic random generator
856       rnd = random.Random(seed)
857       for priority in [rnd.choice(priorities) for _ in range(30)]:
858         modes = [0, 1]
859         rnd.shuffle(modes)
860         for shared in modes:
861           # Unique name
862           acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
863
864           ev = threading.Event()
865           thread = self._addThread(target=_Acquire,
866                                    args=(prev, ev, shared, priority, acqname))
867           prev = ev
868
869           # Record expected aqcuire, see above for structure
870           data = (shared, set([acqname]), set([thread]))
871           priolist = perprio.setdefault(priority, [])
872           if shared:
873             priosh = prioshared.get(priority, None)
874             if priosh:
875               # Shared acquires are merged
876               for i, j in zip(priosh[1:], data[1:]):
877                 i.update(j)
878               assert data[0] == priosh[0]
879             else:
880               prioshared[priority] = data
881               priolist.append(data)
882           else:
883             priolist.append(data)
884
885     # Start all acquires and wait for them
886     first.set()
887     prev.wait()
888
889     # Check lock information
890     self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
891     self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
892                      (self.sl.name, "exclusive",
893                       [threading.currentThread().getName()], None))
894
895     self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
896
897     # Let threads acquire the lock
898     self.sl.release()
899
900     # Wait for everything to finish
901     self._waitThreads()
902
903     self.assert_(self.sl._check_empty())
904
905     # Check acquires by priority
906     for acquires in [perprio[i] for i in sorted(perprio.keys())]:
907       for (_, names, _) in acquires:
908         # For shared acquires, the set will contain 1..n entries. For exclusive
909         # acquires only one.
910         while names:
911           names.remove(self.done.get_nowait())
912       self.assertFalse(compat.any(names for (_, names, _) in acquires))
913
914     self.assertRaises(Queue.Empty, self.done.get_nowait)
915
916   def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
917     self.assertEqual(name, self.sl.name)
918     self.assert_(mode is None)
919     self.assert_(owner is None)
920
921     self.assertEqual([(pendmode, sorted(waiting))
922                       for (pendmode, waiting) in pending],
923                      [(["exclusive", "shared"][int(bool(shared))],
924                        sorted(t.getName() for t in threads))
925                       for acquires in [perprio[i]
926                                        for i in sorted(perprio.keys())]
927                       for (shared, _, threads) in acquires])
928
929
930 class TestSharedLockInCondition(_ThreadedTestCase):
931   """SharedLock as a condition lock tests"""
932
933   def setUp(self):
934     _ThreadedTestCase.setUp(self)
935     self.sl = locking.SharedLock("TestSharedLockInCondition")
936     self.setCondition()
937
938   def setCondition(self):
939     self.cond = threading.Condition(self.sl)
940
941   def testKeepMode(self):
942     self.cond.acquire(shared=1)
943     self.assert_(self.sl._is_owned(shared=1))
944     self.cond.wait(0)
945     self.assert_(self.sl._is_owned(shared=1))
946     self.cond.release()
947     self.cond.acquire(shared=0)
948     self.assert_(self.sl._is_owned(shared=0))
949     self.cond.wait(0)
950     self.assert_(self.sl._is_owned(shared=0))
951     self.cond.release()
952
953
954 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
955   """SharedLock as a pipe condition lock tests"""
956
957   def setCondition(self):
958     self.cond = locking.PipeCondition(self.sl)
959
960
961 class TestSSynchronizedDecorator(_ThreadedTestCase):
962   """Shared Lock Synchronized decorator test"""
963
964   def setUp(self):
965     _ThreadedTestCase.setUp(self)
966
967   @locking.ssynchronized(_decoratorlock)
968   def _doItExclusive(self):
969     self.assert_(_decoratorlock._is_owned())
970     self.done.put('EXC')
971
972   @locking.ssynchronized(_decoratorlock, shared=1)
973   def _doItSharer(self):
974     self.assert_(_decoratorlock._is_owned(shared=1))
975     self.done.put('SHR')
976
977   def testDecoratedFunctions(self):
978     self._doItExclusive()
979     self.assertFalse(_decoratorlock._is_owned())
980     self._doItSharer()
981     self.assertFalse(_decoratorlock._is_owned())
982
983   def testSharersCanCoexist(self):
984     _decoratorlock.acquire(shared=1)
985     threading.Thread(target=self._doItSharer).start()
986     self.assert_(self.done.get(True, 1))
987     _decoratorlock.release()
988
989   @_Repeat
990   def testExclusiveBlocksExclusive(self):
991     _decoratorlock.acquire()
992     self._addThread(target=self._doItExclusive)
993     # give it a bit of time to check that it's not actually doing anything
994     self.assertRaises(Queue.Empty, self.done.get_nowait)
995     _decoratorlock.release()
996     self._waitThreads()
997     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
998
999   @_Repeat
1000   def testExclusiveBlocksSharer(self):
1001     _decoratorlock.acquire()
1002     self._addThread(target=self._doItSharer)
1003     self.assertRaises(Queue.Empty, self.done.get_nowait)
1004     _decoratorlock.release()
1005     self._waitThreads()
1006     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1007
1008   @_Repeat
1009   def testSharerBlocksExclusive(self):
1010     _decoratorlock.acquire(shared=1)
1011     self._addThread(target=self._doItExclusive)
1012     self.assertRaises(Queue.Empty, self.done.get_nowait)
1013     _decoratorlock.release()
1014     self._waitThreads()
1015     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1016
1017
1018 class TestLockSet(_ThreadedTestCase):
1019   """LockSet tests"""
1020
1021   def setUp(self):
1022     _ThreadedTestCase.setUp(self)
1023     self._setUpLS()
1024
1025   def _setUpLS(self):
1026     """Helper to (re)initialize the lock set"""
1027     self.resources = ['one', 'two', 'three']
1028     self.ls = locking.LockSet(self.resources, "TestLockSet")
1029
1030   def testResources(self):
1031     self.assertEquals(self.ls._names(), set(self.resources))
1032     newls = locking.LockSet([], "TestLockSet.testResources")
1033     self.assertEquals(newls._names(), set())
1034
1035   def testAcquireRelease(self):
1036     self.assert_(self.ls.acquire('one'))
1037     self.assertEquals(self.ls._list_owned(), set(['one']))
1038     self.ls.release()
1039     self.assertEquals(self.ls._list_owned(), set())
1040     self.assertEquals(self.ls.acquire(['one']), set(['one']))
1041     self.assertEquals(self.ls._list_owned(), set(['one']))
1042     self.ls.release()
1043     self.assertEquals(self.ls._list_owned(), set())
1044     self.ls.acquire(['one', 'two', 'three'])
1045     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1046     self.ls.release('one')
1047     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
1048     self.ls.release(['three'])
1049     self.assertEquals(self.ls._list_owned(), set(['two']))
1050     self.ls.release()
1051     self.assertEquals(self.ls._list_owned(), set())
1052     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1053     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
1054     self.ls.release()
1055     self.assertEquals(self.ls._list_owned(), set())
1056
1057   def testNoDoubleAcquire(self):
1058     self.ls.acquire('one')
1059     self.assertRaises(AssertionError, self.ls.acquire, 'one')
1060     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1061     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1062     self.ls.release()
1063     self.ls.acquire(['one', 'three'])
1064     self.ls.release('one')
1065     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1066     self.ls.release('three')
1067
1068   def testNoWrongRelease(self):
1069     self.assertRaises(AssertionError, self.ls.release)
1070     self.ls.acquire('one')
1071     self.assertRaises(AssertionError, self.ls.release, 'two')
1072
1073   def testAddRemove(self):
1074     self.ls.add('four')
1075     self.assertEquals(self.ls._list_owned(), set())
1076     self.assert_('four' in self.ls._names())
1077     self.ls.add(['five', 'six', 'seven'], acquired=1)
1078     self.assert_('five' in self.ls._names())
1079     self.assert_('six' in self.ls._names())
1080     self.assert_('seven' in self.ls._names())
1081     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
1082     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1083     self.assert_('five' not in self.ls._names())
1084     self.assert_('six' not in self.ls._names())
1085     self.assertEquals(self.ls._list_owned(), set(['seven']))
1086     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1087     self.ls.remove('seven')
1088     self.assert_('seven' not in self.ls._names())
1089     self.assertEquals(self.ls._list_owned(), set([]))
1090     self.ls.acquire(None, shared=1)
1091     self.assertRaises(AssertionError, self.ls.add, 'eight')
1092     self.ls.release()
1093     self.ls.acquire(None)
1094     self.ls.add('eight', acquired=1)
1095     self.assert_('eight' in self.ls._names())
1096     self.assert_('eight' in self.ls._list_owned())
1097     self.ls.add('nine')
1098     self.assert_('nine' in self.ls._names())
1099     self.assert_('nine' not in self.ls._list_owned())
1100     self.ls.release()
1101     self.ls.remove(['two'])
1102     self.assert_('two' not in self.ls._names())
1103     self.ls.acquire('three')
1104     self.assertEquals(self.ls.remove(['three']), ['three'])
1105     self.assert_('three' not in self.ls._names())
1106     self.assertEquals(self.ls.remove('three'), [])
1107     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1108     self.assert_('one' not in self.ls._names())
1109
1110   def testRemoveNonBlocking(self):
1111     self.ls.acquire('one')
1112     self.assertEquals(self.ls.remove('one'), ['one'])
1113     self.ls.acquire(['two', 'three'])
1114     self.assertEquals(self.ls.remove(['two', 'three']),
1115                       ['two', 'three'])
1116
1117   def testNoDoubleAdd(self):
1118     self.assertRaises(errors.LockError, self.ls.add, 'two')
1119     self.ls.add('four')
1120     self.assertRaises(errors.LockError, self.ls.add, 'four')
1121
1122   def testNoWrongRemoves(self):
1123     self.ls.acquire(['one', 'three'], shared=1)
1124     # Cannot remove 'two' while holding something which is not a superset
1125     self.assertRaises(AssertionError, self.ls.remove, 'two')
1126     # Cannot remove 'three' as we are sharing it
1127     self.assertRaises(AssertionError, self.ls.remove, 'three')
1128
1129   def testAcquireSetLock(self):
1130     # acquire the set-lock exclusively
1131     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1132     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1133     self.assertEquals(self.ls._is_owned(), True)
1134     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1135     # I can still add/remove elements...
1136     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1137     self.assert_(self.ls.add('six'))
1138     self.ls.release()
1139     # share the set-lock
1140     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1141     # adding new elements is not possible
1142     self.assertRaises(AssertionError, self.ls.add, 'five')
1143     self.ls.release()
1144
1145   def testAcquireWithRepetitions(self):
1146     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1147                       set(['two', 'two', 'three']))
1148     self.ls.release(['two', 'two'])
1149     self.assertEquals(self.ls._list_owned(), set(['three']))
1150
1151   def testEmptyAcquire(self):
1152     # Acquire an empty list of locks...
1153     self.assertEquals(self.ls.acquire([]), set())
1154     self.assertEquals(self.ls._list_owned(), set())
1155     # New locks can still be addded
1156     self.assert_(self.ls.add('six'))
1157     # "re-acquiring" is not an issue, since we had really acquired nothing
1158     self.assertEquals(self.ls.acquire([], shared=1), set())
1159     self.assertEquals(self.ls._list_owned(), set())
1160     # We haven't really acquired anything, so we cannot release
1161     self.assertRaises(AssertionError, self.ls.release)
1162
1163   def _doLockSet(self, names, shared):
1164     try:
1165       self.ls.acquire(names, shared=shared)
1166       self.done.put('DONE')
1167       self.ls.release()
1168     except errors.LockError:
1169       self.done.put('ERR')
1170
1171   def _doAddSet(self, names):
1172     try:
1173       self.ls.add(names, acquired=1)
1174       self.done.put('DONE')
1175       self.ls.release()
1176     except errors.LockError:
1177       self.done.put('ERR')
1178
1179   def _doRemoveSet(self, names):
1180     self.done.put(self.ls.remove(names))
1181
1182   @_Repeat
1183   def testConcurrentSharedAcquire(self):
1184     self.ls.acquire(['one', 'two'], shared=1)
1185     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1186     self._waitThreads()
1187     self.assertEqual(self.done.get_nowait(), 'DONE')
1188     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1189     self._waitThreads()
1190     self.assertEqual(self.done.get_nowait(), 'DONE')
1191     self._addThread(target=self._doLockSet, args=('three', 1))
1192     self._waitThreads()
1193     self.assertEqual(self.done.get_nowait(), 'DONE')
1194     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1195     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1196     self.assertRaises(Queue.Empty, self.done.get_nowait)
1197     self.ls.release()
1198     self._waitThreads()
1199     self.assertEqual(self.done.get_nowait(), 'DONE')
1200     self.assertEqual(self.done.get_nowait(), 'DONE')
1201
1202   @_Repeat
1203   def testConcurrentExclusiveAcquire(self):
1204     self.ls.acquire(['one', 'two'])
1205     self._addThread(target=self._doLockSet, args=('three', 1))
1206     self._waitThreads()
1207     self.assertEqual(self.done.get_nowait(), 'DONE')
1208     self._addThread(target=self._doLockSet, args=('three', 0))
1209     self._waitThreads()
1210     self.assertEqual(self.done.get_nowait(), 'DONE')
1211     self.assertRaises(Queue.Empty, self.done.get_nowait)
1212     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1213     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1214     self._addThread(target=self._doLockSet, args=('one', 0))
1215     self._addThread(target=self._doLockSet, args=('one', 1))
1216     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1217     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1218     self.assertRaises(Queue.Empty, self.done.get_nowait)
1219     self.ls.release()
1220     self._waitThreads()
1221     for _ in range(6):
1222       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1223
1224   @_Repeat
1225   def testSimpleAcquireTimeoutExpiring(self):
1226     names = sorted(self.ls._names())
1227     self.assert_(len(names) >= 3)
1228
1229     # Get name of first lock
1230     first = names[0]
1231
1232     # Get name of last lock
1233     last = names.pop()
1234
1235     checks = [
1236       # Block first and try to lock it again
1237       (first, first),
1238
1239       # Block last and try to lock all locks
1240       (None, first),
1241
1242       # Block last and try to lock it again
1243       (last, last),
1244       ]
1245
1246     for (wanted, block) in checks:
1247       # Lock in exclusive mode
1248       self.assert_(self.ls.acquire(block, shared=0))
1249
1250       def _AcquireOne():
1251         # Try to get the same lock again with a timeout (should never succeed)
1252         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1253         if acquired:
1254           self.done.put("acquired")
1255           self.ls.release()
1256         else:
1257           self.assert_(acquired is None)
1258           self.assertFalse(self.ls._list_owned())
1259           self.assertFalse(self.ls._is_owned())
1260           self.done.put("not acquired")
1261
1262       self._addThread(target=_AcquireOne)
1263
1264       # Wait for timeout in thread to expire
1265       self._waitThreads()
1266
1267       # Release exclusive lock again
1268       self.ls.release()
1269
1270       self.assertEqual(self.done.get_nowait(), "not acquired")
1271       self.assertRaises(Queue.Empty, self.done.get_nowait)
1272
1273   @_Repeat
1274   def testDelayedAndExpiringLockAcquire(self):
1275     self._setUpLS()
1276     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1277
1278     for expire in (False, True):
1279       names = sorted(self.ls._names())
1280       self.assertEqual(len(names), 8)
1281
1282       lock_ev = dict([(i, threading.Event()) for i in names])
1283
1284       # Lock all in exclusive mode
1285       self.assert_(self.ls.acquire(names, shared=0))
1286
1287       if expire:
1288         # We'll wait at least 300ms per lock
1289         lockwait = len(names) * [0.3]
1290
1291         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1292         # this gives us up to 2.4s to fail.
1293         lockall_timeout = 0.4
1294       else:
1295         # This should finish rather quickly
1296         lockwait = None
1297         lockall_timeout = len(names) * 5.0
1298
1299       def _LockAll():
1300         def acquire_notification(name):
1301           if not expire:
1302             self.done.put("getting %s" % name)
1303
1304           # Kick next lock
1305           lock_ev[name].set()
1306
1307         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1308                            test_notify=acquire_notification):
1309           self.done.put("got all")
1310           self.ls.release()
1311         else:
1312           self.done.put("timeout on all")
1313
1314         # Notify all locks
1315         for ev in lock_ev.values():
1316           ev.set()
1317
1318       t = self._addThread(target=_LockAll)
1319
1320       for idx, name in enumerate(names):
1321         # Wait for actual acquire on this lock to start
1322         lock_ev[name].wait(10.0)
1323
1324         if expire and t.isAlive():
1325           # Wait some time after getting the notification to make sure the lock
1326           # acquire will expire
1327           SafeSleep(lockwait[idx])
1328
1329         self.ls.release(names=name)
1330
1331       self.assertFalse(self.ls._list_owned())
1332
1333       self._waitThreads()
1334
1335       if expire:
1336         # Not checking which locks were actually acquired. Doing so would be
1337         # too timing-dependant.
1338         self.assertEqual(self.done.get_nowait(), "timeout on all")
1339       else:
1340         for i in names:
1341           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1342         self.assertEqual(self.done.get_nowait(), "got all")
1343       self.assertRaises(Queue.Empty, self.done.get_nowait)
1344
1345   @_Repeat
1346   def testConcurrentRemove(self):
1347     self.ls.add('four')
1348     self.ls.acquire(['one', 'two', 'four'])
1349     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1350     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1351     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1352     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1353     self.assertRaises(Queue.Empty, self.done.get_nowait)
1354     self.ls.remove('one')
1355     self.ls.release()
1356     self._waitThreads()
1357     for i in range(4):
1358       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1359     self.ls.add(['five', 'six'], acquired=1)
1360     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1361     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1362     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1363     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1364     self.ls.remove('five')
1365     self.ls.release()
1366     self._waitThreads()
1367     for i in range(4):
1368       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1369     self.ls.acquire(['three', 'four'])
1370     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1371     self.assertRaises(Queue.Empty, self.done.get_nowait)
1372     self.ls.remove('four')
1373     self._waitThreads()
1374     self.assertEqual(self.done.get_nowait(), ['six'])
1375     self._addThread(target=self._doRemoveSet, args=(['two']))
1376     self._waitThreads()
1377     self.assertEqual(self.done.get_nowait(), ['two'])
1378     self.ls.release()
1379     # reset lockset
1380     self._setUpLS()
1381
1382   @_Repeat
1383   def testConcurrentSharedSetLock(self):
1384     # share the set-lock...
1385     self.ls.acquire(None, shared=1)
1386     # ...another thread can share it too
1387     self._addThread(target=self._doLockSet, args=(None, 1))
1388     self._waitThreads()
1389     self.assertEqual(self.done.get_nowait(), 'DONE')
1390     # ...or just share some elements
1391     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1392     self._waitThreads()
1393     self.assertEqual(self.done.get_nowait(), 'DONE')
1394     # ...but not add new ones or remove any
1395     t = self._addThread(target=self._doAddSet, args=(['nine']))
1396     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1397     self.assertRaises(Queue.Empty, self.done.get_nowait)
1398     # this just releases the set-lock
1399     self.ls.release([])
1400     t.join(60)
1401     self.assertEqual(self.done.get_nowait(), 'DONE')
1402     # release the lock on the actual elements so remove() can proceed too
1403     self.ls.release()
1404     self._waitThreads()
1405     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1406     # reset lockset
1407     self._setUpLS()
1408
1409   @_Repeat
1410   def testConcurrentExclusiveSetLock(self):
1411     # acquire the set-lock...
1412     self.ls.acquire(None, shared=0)
1413     # ...no one can do anything else
1414     self._addThread(target=self._doLockSet, args=(None, 1))
1415     self._addThread(target=self._doLockSet, args=(None, 0))
1416     self._addThread(target=self._doLockSet, args=(['three'], 0))
1417     self._addThread(target=self._doLockSet, args=(['two'], 1))
1418     self._addThread(target=self._doAddSet, args=(['nine']))
1419     self.assertRaises(Queue.Empty, self.done.get_nowait)
1420     self.ls.release()
1421     self._waitThreads()
1422     for _ in range(5):
1423       self.assertEqual(self.done.get(True, 1), 'DONE')
1424     # cleanup
1425     self._setUpLS()
1426
1427   @_Repeat
1428   def testConcurrentSetLockAdd(self):
1429     self.ls.acquire('one')
1430     # Another thread wants the whole SetLock
1431     self._addThread(target=self._doLockSet, args=(None, 0))
1432     self._addThread(target=self._doLockSet, args=(None, 1))
1433     self.assertRaises(Queue.Empty, self.done.get_nowait)
1434     self.assertRaises(AssertionError, self.ls.add, 'four')
1435     self.ls.release()
1436     self._waitThreads()
1437     self.assertEqual(self.done.get_nowait(), 'DONE')
1438     self.assertEqual(self.done.get_nowait(), 'DONE')
1439     self.ls.acquire(None)
1440     self._addThread(target=self._doLockSet, args=(None, 0))
1441     self._addThread(target=self._doLockSet, args=(None, 1))
1442     self.assertRaises(Queue.Empty, self.done.get_nowait)
1443     self.ls.add('four')
1444     self.ls.add('five', acquired=1)
1445     self.ls.add('six', acquired=1, shared=1)
1446     self.assertEquals(self.ls._list_owned(),
1447       set(['one', 'two', 'three', 'five', 'six']))
1448     self.assertEquals(self.ls._is_owned(), True)
1449     self.assertEquals(self.ls._names(),
1450       set(['one', 'two', 'three', 'four', 'five', 'six']))
1451     self.ls.release()
1452     self._waitThreads()
1453     self.assertEqual(self.done.get_nowait(), 'DONE')
1454     self.assertEqual(self.done.get_nowait(), 'DONE')
1455     self._setUpLS()
1456
1457   @_Repeat
1458   def testEmptyLockSet(self):
1459     # get the set-lock
1460     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1461     # now empty it...
1462     self.ls.remove(['one', 'two', 'three'])
1463     # and adds/locks by another thread still wait
1464     self._addThread(target=self._doAddSet, args=(['nine']))
1465     self._addThread(target=self._doLockSet, args=(None, 1))
1466     self._addThread(target=self._doLockSet, args=(None, 0))
1467     self.assertRaises(Queue.Empty, self.done.get_nowait)
1468     self.ls.release()
1469     self._waitThreads()
1470     for _ in range(3):
1471       self.assertEqual(self.done.get_nowait(), 'DONE')
1472     # empty it again...
1473     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1474     # now share it...
1475     self.assertEqual(self.ls.acquire(None, shared=1), set())
1476     # other sharers can go, adds still wait
1477     self._addThread(target=self._doLockSet, args=(None, 1))
1478     self._waitThreads()
1479     self.assertEqual(self.done.get_nowait(), 'DONE')
1480     self._addThread(target=self._doAddSet, args=(['nine']))
1481     self.assertRaises(Queue.Empty, self.done.get_nowait)
1482     self.ls.release()
1483     self._waitThreads()
1484     self.assertEqual(self.done.get_nowait(), 'DONE')
1485     self._setUpLS()
1486
1487   def testAcquireWithNamesDowngrade(self):
1488     self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1489     self.assertTrue(self.ls._is_owned())
1490     self.assertFalse(self.ls._get_lock()._is_owned())
1491     self.ls.release()
1492     self.assertFalse(self.ls._is_owned())
1493     self.assertFalse(self.ls._get_lock()._is_owned())
1494     # Can't downgrade after releasing
1495     self.assertRaises(AssertionError, self.ls.downgrade, "two")
1496
1497   def testDowngrade(self):
1498     # Not owning anything, must raise an exception
1499     self.assertFalse(self.ls._is_owned())
1500     self.assertRaises(AssertionError, self.ls.downgrade)
1501
1502     self.assertFalse(compat.any(i._is_owned()
1503                                 for i in self.ls._get_lockdict().values()))
1504
1505     self.assertEquals(self.ls.acquire(None, shared=0),
1506                       set(["one", "two", "three"]))
1507     self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1508
1509     self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1510     self.assertTrue(compat.all(i._is_owned(shared=0)
1511                                for i in self.ls._get_lockdict().values()))
1512
1513     # Start downgrading locks
1514     self.assertTrue(self.ls.downgrade(names=["one"]))
1515     self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1516     self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")])
1517                                for name, lock in
1518                                  self.ls._get_lockdict().items()))
1519
1520     self.assertTrue(self.ls.downgrade(names="two"))
1521     self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1522     should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1523     self.assertTrue(compat.all(lock._is_owned(shared=should_share(name))
1524                                for name, lock in
1525                                  self.ls._get_lockdict().items()))
1526
1527     # Downgrading the last exclusive lock to shared must downgrade the
1528     # lockset-internal lock too
1529     self.assertTrue(self.ls.downgrade(names="three"))
1530     self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1531     self.assertTrue(compat.all(i._is_owned(shared=1)
1532                                for i in self.ls._get_lockdict().values()))
1533
1534     # Downgrading a shared lock must be a no-op
1535     self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1536     self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1537     self.assertTrue(compat.all(i._is_owned(shared=1)
1538                                for i in self.ls._get_lockdict().values()))
1539
1540     self.ls.release()
1541
1542   def testPriority(self):
1543     def _Acquire(prev, next, name, priority, success_fn):
1544       prev.wait()
1545       self.assert_(self.ls.acquire(name, shared=0,
1546                                    priority=priority,
1547                                    test_notify=lambda _: next.set()))
1548       try:
1549         success_fn()
1550       finally:
1551         self.ls.release()
1552
1553     # Get all in exclusive mode
1554     self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1555
1556     done_two = Queue.Queue(0)
1557
1558     first = threading.Event()
1559     prev = first
1560
1561     acquires = [("one", prio, self.done) for prio in range(1, 33)]
1562     acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1563
1564     # Use a deterministic random generator
1565     random.Random(741).shuffle(acquires)
1566
1567     for (name, prio, done) in acquires:
1568       ev = threading.Event()
1569       self._addThread(target=_Acquire,
1570                       args=(prev, ev, name, prio,
1571                             compat.partial(done.put, "Prio%s" % prio)))
1572       prev = ev
1573
1574     # Start acquires
1575     first.set()
1576
1577     # Wait for last acquire to start
1578     prev.wait()
1579
1580     # Let threads acquire locks
1581     self.ls.release()
1582
1583     # Wait for threads to finish
1584     self._waitThreads()
1585
1586     for i in range(1, 33):
1587       self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1588       self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1589
1590     self.assertRaises(Queue.Empty, self.done.get_nowait)
1591     self.assertRaises(Queue.Empty, done_two.get_nowait)
1592
1593
1594 class TestGanetiLockManager(_ThreadedTestCase):
1595
1596   def setUp(self):
1597     _ThreadedTestCase.setUp(self)
1598     self.nodes=['n1', 'n2']
1599     self.nodegroups=['g1', 'g2']
1600     self.instances=['i1', 'i2', 'i3']
1601     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1602                                         self.instances)
1603
1604   def tearDown(self):
1605     # Don't try this at home...
1606     locking.GanetiLockManager._instance = None
1607
1608   def testLockingConstants(self):
1609     # The locking library internally cheats by assuming its constants have some
1610     # relationships with each other. Check those hold true.
1611     # This relationship is also used in the Processor to recursively acquire
1612     # the right locks. Again, please don't break it.
1613     for i in range(len(locking.LEVELS)):
1614       self.assertEqual(i, locking.LEVELS[i])
1615
1616   def testDoubleGLFails(self):
1617     self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1618
1619   def testLockNames(self):
1620     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1621     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1622     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1623                      set(self.nodegroups))
1624     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1625                      set(self.instances))
1626
1627   def testInitAndResources(self):
1628     locking.GanetiLockManager._instance = None
1629     self.GL = locking.GanetiLockManager([], [], [])
1630     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1631     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1632     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1633     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1634
1635     locking.GanetiLockManager._instance = None
1636     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1637     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1638     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1639     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1640                                     set(self.nodegroups))
1641     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1642
1643     locking.GanetiLockManager._instance = None
1644     self.GL = locking.GanetiLockManager([], [], self.instances)
1645     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1646     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1647     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1648     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1649                      set(self.instances))
1650
1651   def testAcquireRelease(self):
1652     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1653     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1654     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1655     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1656     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1657     self.GL.release(locking.LEVEL_NODE, ['n2'])
1658     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1659     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1660     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1661     self.GL.release(locking.LEVEL_NODE)
1662     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1663     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1664     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1665     self.GL.release(locking.LEVEL_NODEGROUP)
1666     self.GL.release(locking.LEVEL_INSTANCE)
1667     self.assertRaises(errors.LockError, self.GL.acquire,
1668                       locking.LEVEL_INSTANCE, ['i5'])
1669     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1670     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1671
1672   def testAcquireWholeSets(self):
1673     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1674     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1675                       set(self.instances))
1676     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1677                       set(self.instances))
1678     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1679                       set(self.nodegroups))
1680     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1681                       set(self.nodegroups))
1682     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1683                       set(self.nodes))
1684     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1685                       set(self.nodes))
1686     self.GL.release(locking.LEVEL_NODE)
1687     self.GL.release(locking.LEVEL_NODEGROUP)
1688     self.GL.release(locking.LEVEL_INSTANCE)
1689     self.GL.release(locking.LEVEL_CLUSTER)
1690
1691   def testAcquireWholeAndPartial(self):
1692     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1693     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1694                       set(self.instances))
1695     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1696                       set(self.instances))
1697     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1698                       set(['n2']))
1699     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1700                       set(['n2']))
1701     self.GL.release(locking.LEVEL_NODE)
1702     self.GL.release(locking.LEVEL_INSTANCE)
1703     self.GL.release(locking.LEVEL_CLUSTER)
1704
1705   def testBGLDependency(self):
1706     self.assertRaises(AssertionError, self.GL.acquire,
1707                       locking.LEVEL_NODE, ['n1', 'n2'])
1708     self.assertRaises(AssertionError, self.GL.acquire,
1709                       locking.LEVEL_INSTANCE, ['i3'])
1710     self.assertRaises(AssertionError, self.GL.acquire,
1711                       locking.LEVEL_NODEGROUP, ['g1'])
1712     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1713     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1714     self.assertRaises(AssertionError, self.GL.release,
1715                       locking.LEVEL_CLUSTER, ['BGL'])
1716     self.assertRaises(AssertionError, self.GL.release,
1717                       locking.LEVEL_CLUSTER)
1718     self.GL.release(locking.LEVEL_NODE)
1719     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1720     self.assertRaises(AssertionError, self.GL.release,
1721                       locking.LEVEL_CLUSTER, ['BGL'])
1722     self.assertRaises(AssertionError, self.GL.release,
1723                       locking.LEVEL_CLUSTER)
1724     self.GL.release(locking.LEVEL_INSTANCE)
1725     self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1726     self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1727     self.assertRaises(AssertionError, self.GL.release,
1728                       locking.LEVEL_CLUSTER, ['BGL'])
1729     self.assertRaises(AssertionError, self.GL.release,
1730                       locking.LEVEL_CLUSTER)
1731     self.GL.release(locking.LEVEL_NODEGROUP)
1732     self.GL.release(locking.LEVEL_CLUSTER)
1733
1734   def testWrongOrder(self):
1735     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1736     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1737     self.assertRaises(AssertionError, self.GL.acquire,
1738                       locking.LEVEL_NODE, ['n1'])
1739     self.assertRaises(AssertionError, self.GL.acquire,
1740                       locking.LEVEL_NODEGROUP, ['g1'])
1741     self.assertRaises(AssertionError, self.GL.acquire,
1742                       locking.LEVEL_INSTANCE, ['i2'])
1743
1744   def testModifiableLevels(self):
1745     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1746                       ['BGL2'])
1747     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1748     self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1749     self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1750     self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1751     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1752     self.GL.add(locking.LEVEL_NODE, ['n3'])
1753     self.GL.remove(locking.LEVEL_NODE, ['n1'])
1754     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1755     self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1756     self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1757     self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1758     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1759     self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1760                       ['BGL2'])
1761
1762   # Helper function to run as a thread that shared the BGL and then acquires
1763   # some locks at another level.
1764   def _doLock(self, level, names, shared):
1765     try:
1766       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1767       self.GL.acquire(level, names, shared=shared)
1768       self.done.put('DONE')
1769       self.GL.release(level)
1770       self.GL.release(locking.LEVEL_CLUSTER)
1771     except errors.LockError:
1772       self.done.put('ERR')
1773
1774   @_Repeat
1775   def testConcurrency(self):
1776     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1777     self._addThread(target=self._doLock,
1778                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1779     self._waitThreads()
1780     self.assertEqual(self.done.get_nowait(), 'DONE')
1781     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1782     self._addThread(target=self._doLock,
1783                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1784     self._waitThreads()
1785     self.assertEqual(self.done.get_nowait(), 'DONE')
1786     self._addThread(target=self._doLock,
1787                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1788     self.assertRaises(Queue.Empty, self.done.get_nowait)
1789     self.GL.release(locking.LEVEL_INSTANCE)
1790     self._waitThreads()
1791     self.assertEqual(self.done.get_nowait(), 'DONE')
1792     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1793     self._addThread(target=self._doLock,
1794                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1795     self._waitThreads()
1796     self.assertEqual(self.done.get_nowait(), 'DONE')
1797     self._addThread(target=self._doLock,
1798                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1799     self.assertRaises(Queue.Empty, self.done.get_nowait)
1800     self.GL.release(locking.LEVEL_INSTANCE)
1801     self._waitThreads()
1802     self.assertEqual(self.done.get(True, 1), 'DONE')
1803     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1804
1805
1806 class TestLockMonitor(_ThreadedTestCase):
1807   def setUp(self):
1808     _ThreadedTestCase.setUp(self)
1809     self.lm = locking.LockMonitor()
1810
1811   def testSingleThread(self):
1812     locks = []
1813
1814     for i in range(100):
1815       name = "TestLock%s" % i
1816       locks.append(locking.SharedLock(name, monitor=self.lm))
1817
1818     self.assertEqual(len(self.lm._locks), len(locks))
1819     result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1820     self.assertEqual(len(result.fields), 1)
1821     self.assertEqual(len(result.data), 100)
1822
1823     # Delete all locks
1824     del locks[:]
1825
1826     # The garbage collector might needs some time
1827     def _CheckLocks():
1828       if self.lm._locks:
1829         raise utils.RetryAgain()
1830
1831     utils.Retry(_CheckLocks, 0.1, 30.0)
1832
1833     self.assertFalse(self.lm._locks)
1834
1835   def testMultiThread(self):
1836     locks = []
1837
1838     def _CreateLock(prev, next, name):
1839       prev.wait()
1840       locks.append(locking.SharedLock(name, monitor=self.lm))
1841       if next:
1842         next.set()
1843
1844     expnames = []
1845
1846     first = threading.Event()
1847     prev = first
1848
1849     # Use a deterministic random generator
1850     for i in random.Random(4263).sample(range(100), 33):
1851       name = "MtTestLock%s" % i
1852       expnames.append(name)
1853
1854       ev = threading.Event()
1855       self._addThread(target=_CreateLock, args=(prev, ev, name))
1856       prev = ev
1857
1858     # Add locks
1859     first.set()
1860     self._waitThreads()
1861
1862     # Check order in which locks were added
1863     self.assertEqual([i.name for i in locks], expnames)
1864
1865     # Check query result
1866     result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1867     self.assert_(isinstance(result, dict))
1868     response = objects.QueryResponse.FromDict(result)
1869     self.assertEqual(response.data,
1870                      [[(constants.RS_NORMAL, name),
1871                        (constants.RS_NORMAL, None),
1872                        (constants.RS_NORMAL, None),
1873                        (constants.RS_NORMAL, [])]
1874                       for name in utils.NiceSort(expnames)])
1875     self.assertEqual(len(response.fields), 4)
1876     self.assertEqual(["name", "mode", "owner", "pending"],
1877                      [fdef.name for fdef in response.fields])
1878
1879     # Test exclusive acquire
1880     for tlock in locks[::4]:
1881       tlock.acquire(shared=0)
1882       try:
1883         def _GetExpResult(name):
1884           if tlock.name == name:
1885             return [(constants.RS_NORMAL, name),
1886                     (constants.RS_NORMAL, "exclusive"),
1887                     (constants.RS_NORMAL,
1888                      [threading.currentThread().getName()]),
1889                     (constants.RS_NORMAL, [])]
1890           return [(constants.RS_NORMAL, name),
1891                   (constants.RS_NORMAL, None),
1892                   (constants.RS_NORMAL, None),
1893                   (constants.RS_NORMAL, [])]
1894
1895         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1896         self.assertEqual(objects.QueryResponse.FromDict(result).data,
1897                          [_GetExpResult(name)
1898                           for name in utils.NiceSort(expnames)])
1899       finally:
1900         tlock.release()
1901
1902     # Test shared acquire
1903     def _Acquire(lock, shared, ev, notify):
1904       lock.acquire(shared=shared)
1905       try:
1906         notify.set()
1907         ev.wait()
1908       finally:
1909         lock.release()
1910
1911     for tlock1 in locks[::11]:
1912       for tlock2 in locks[::-15]:
1913         if tlock2 == tlock1:
1914           # Avoid deadlocks
1915           continue
1916
1917         for tlock3 in locks[::10]:
1918           if tlock3 in (tlock2, tlock1):
1919             # Avoid deadlocks
1920             continue
1921
1922           releaseev = threading.Event()
1923
1924           # Acquire locks
1925           acquireev = []
1926           tthreads1 = []
1927           for i in range(3):
1928             ev = threading.Event()
1929             tthreads1.append(self._addThread(target=_Acquire,
1930                                              args=(tlock1, 1, releaseev, ev)))
1931             acquireev.append(ev)
1932
1933           ev = threading.Event()
1934           tthread2 = self._addThread(target=_Acquire,
1935                                      args=(tlock2, 1, releaseev, ev))
1936           acquireev.append(ev)
1937
1938           ev = threading.Event()
1939           tthread3 = self._addThread(target=_Acquire,
1940                                      args=(tlock3, 0, releaseev, ev))
1941           acquireev.append(ev)
1942
1943           # Wait for all locks to be acquired
1944           for i in acquireev:
1945             i.wait()
1946
1947           # Check query result
1948           result = self.lm.QueryLocks(["name", "mode", "owner"])
1949           response = objects.QueryResponse.FromDict(result)
1950           for (name, mode, owner) in response.data:
1951             (name_status, name_value) = name
1952             (owner_status, owner_value) = owner
1953
1954             self.assertEqual(name_status, constants.RS_NORMAL)
1955             self.assertEqual(owner_status, constants.RS_NORMAL)
1956
1957             if name_value == tlock1.name:
1958               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1959               self.assertEqual(set(owner_value),
1960                                set(i.getName() for i in tthreads1))
1961               continue
1962
1963             if name_value == tlock2.name:
1964               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1965               self.assertEqual(owner_value, [tthread2.getName()])
1966               continue
1967
1968             if name_value == tlock3.name:
1969               self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1970               self.assertEqual(owner_value, [tthread3.getName()])
1971               continue
1972
1973             self.assert_(name_value in expnames)
1974             self.assertEqual(mode, (constants.RS_NORMAL, None))
1975             self.assert_(owner_value is None)
1976
1977           # Release locks again
1978           releaseev.set()
1979
1980           self._waitThreads()
1981
1982           result = self.lm.QueryLocks(["name", "mode", "owner"])
1983           self.assertEqual(objects.QueryResponse.FromDict(result).data,
1984                            [[(constants.RS_NORMAL, name),
1985                              (constants.RS_NORMAL, None),
1986                              (constants.RS_NORMAL, None)]
1987                             for name in utils.NiceSort(expnames)])
1988
1989   def testDelete(self):
1990     lock = locking.SharedLock("TestLock", monitor=self.lm)
1991
1992     self.assertEqual(len(self.lm._locks), 1)
1993     result = self.lm.QueryLocks(["name", "mode", "owner"])
1994     self.assertEqual(objects.QueryResponse.FromDict(result).data,
1995                      [[(constants.RS_NORMAL, lock.name),
1996                        (constants.RS_NORMAL, None),
1997                        (constants.RS_NORMAL, None)]])
1998
1999     lock.delete()
2000
2001     result = self.lm.QueryLocks(["name", "mode", "owner"])
2002     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2003                      [[(constants.RS_NORMAL, lock.name),
2004                        (constants.RS_NORMAL, "deleted"),
2005                        (constants.RS_NORMAL, None)]])
2006     self.assertEqual(len(self.lm._locks), 1)
2007
2008   def testPending(self):
2009     def _Acquire(lock, shared, prev, next):
2010       prev.wait()
2011
2012       lock.acquire(shared=shared, test_notify=next.set)
2013       try:
2014         pass
2015       finally:
2016         lock.release()
2017
2018     lock = locking.SharedLock("ExcLock", monitor=self.lm)
2019
2020     for shared in [0, 1]:
2021       lock.acquire()
2022       try:
2023         self.assertEqual(len(self.lm._locks), 1)
2024         result = self.lm.QueryLocks(["name", "mode", "owner"])
2025         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2026                          [[(constants.RS_NORMAL, lock.name),
2027                            (constants.RS_NORMAL, "exclusive"),
2028                            (constants.RS_NORMAL,
2029                             [threading.currentThread().getName()])]])
2030
2031         threads = []
2032
2033         first = threading.Event()
2034         prev = first
2035
2036         for i in range(5):
2037           ev = threading.Event()
2038           threads.append(self._addThread(target=_Acquire,
2039                                           args=(lock, shared, prev, ev)))
2040           prev = ev
2041
2042         # Start acquires
2043         first.set()
2044
2045         # Wait for last acquire to start waiting
2046         prev.wait()
2047
2048         # NOTE: This works only because QueryLocks will acquire the
2049         # lock-internal lock again and won't be able to get the information
2050         # until it has the lock. By then the acquire should be registered in
2051         # SharedLock.__pending (otherwise it's a bug).
2052
2053         # All acquires are waiting now
2054         if shared:
2055           pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2056         else:
2057           pending = [("exclusive", [t.getName()]) for t in threads]
2058
2059         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2060         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2061                          [[(constants.RS_NORMAL, lock.name),
2062                            (constants.RS_NORMAL, "exclusive"),
2063                            (constants.RS_NORMAL,
2064                             [threading.currentThread().getName()]),
2065                            (constants.RS_NORMAL, pending)]])
2066
2067         self.assertEqual(len(self.lm._locks), 1)
2068       finally:
2069         lock.release()
2070
2071       self._waitThreads()
2072
2073       # No pending acquires
2074       result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2075       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2076                        [[(constants.RS_NORMAL, lock.name),
2077                          (constants.RS_NORMAL, None),
2078                          (constants.RS_NORMAL, None),
2079                          (constants.RS_NORMAL, [])]])
2080
2081       self.assertEqual(len(self.lm._locks), 1)
2082
2083   def testDeleteAndRecreate(self):
2084     lname = "TestLock101923193"
2085
2086     # Create some locks with the same name and keep all references
2087     locks = [locking.SharedLock(lname, monitor=self.lm)
2088              for _ in range(5)]
2089
2090     self.assertEqual(len(self.lm._locks), len(locks))
2091
2092     result = self.lm.QueryLocks(["name", "mode", "owner"])
2093     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2094                      [[(constants.RS_NORMAL, lname),
2095                        (constants.RS_NORMAL, None),
2096                        (constants.RS_NORMAL, None)]] * 5)
2097
2098     locks[2].delete()
2099
2100     # Check information order
2101     result = self.lm.QueryLocks(["name", "mode", "owner"])
2102     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2103                      [[(constants.RS_NORMAL, lname),
2104                        (constants.RS_NORMAL, None),
2105                        (constants.RS_NORMAL, None)]] * 2 +
2106                      [[(constants.RS_NORMAL, lname),
2107                        (constants.RS_NORMAL, "deleted"),
2108                        (constants.RS_NORMAL, None)]] +
2109                      [[(constants.RS_NORMAL, lname),
2110                        (constants.RS_NORMAL, None),
2111                        (constants.RS_NORMAL, None)]] * 2)
2112
2113     locks[1].acquire(shared=0)
2114
2115     last_status = [
2116       [(constants.RS_NORMAL, lname),
2117        (constants.RS_NORMAL, None),
2118        (constants.RS_NORMAL, None)],
2119       [(constants.RS_NORMAL, lname),
2120        (constants.RS_NORMAL, "exclusive"),
2121        (constants.RS_NORMAL, [threading.currentThread().getName()])],
2122       [(constants.RS_NORMAL, lname),
2123        (constants.RS_NORMAL, "deleted"),
2124        (constants.RS_NORMAL, None)],
2125       [(constants.RS_NORMAL, lname),
2126        (constants.RS_NORMAL, None),
2127        (constants.RS_NORMAL, None)],
2128       [(constants.RS_NORMAL, lname),
2129        (constants.RS_NORMAL, None),
2130        (constants.RS_NORMAL, None)],
2131       ]
2132
2133     # Check information order
2134     result = self.lm.QueryLocks(["name", "mode", "owner"])
2135     self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2136
2137     self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2138     self.assertEqual(len(self.lm._locks), len(locks))
2139
2140     # Check lock deletion
2141     for idx in range(len(locks)):
2142       del locks[0]
2143       assert gc.isenabled()
2144       gc.collect()
2145       self.assertEqual(len(self.lm._locks), len(locks))
2146       result = self.lm.QueryLocks(["name", "mode", "owner"])
2147       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2148                        last_status[idx + 1:])
2149
2150     # All locks should have been deleted
2151     assert not locks
2152     self.assertFalse(self.lm._locks)
2153
2154     result = self.lm.QueryLocks(["name", "mode", "owner"])
2155     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2156
2157
2158 if __name__ == '__main__':
2159   testutils.GanetiTestProgram()