Add master IP turnup and turndown hooks
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31 import gc
32 import itertools
33
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
41
42 import testutils
43
44
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
48
49 #: List for looping tests
50 ITERATIONS = range(8)
51
52
53 def _Repeat(fn):
54   """Decorator for executing a function many times"""
55   def wrapper(*args, **kwargs):
56     for i in ITERATIONS:
57       fn(*args, **kwargs)
58   return wrapper
59
60
61 def SafeSleep(duration):
62   start = time.time()
63   while True:
64     delay = start + duration - time.time()
65     if delay <= 0.0:
66       break
67     time.sleep(delay)
68
69
70 class _ThreadedTestCase(unittest.TestCase):
71   """Test class that supports adding/waiting on threads"""
72   def setUp(self):
73     unittest.TestCase.setUp(self)
74     self.done = Queue.Queue(0)
75     self.threads = []
76
77   def _addThread(self, *args, **kwargs):
78     """Create and remember a new thread"""
79     t = threading.Thread(*args, **kwargs)
80     self.threads.append(t)
81     t.start()
82     return t
83
84   def _waitThreads(self):
85     """Wait for all our threads to finish"""
86     for t in self.threads:
87       t.join(60)
88       self.failIf(t.isAlive())
89     self.threads = []
90
91
92 class _ConditionTestCase(_ThreadedTestCase):
93   """Common test case for conditions"""
94
95   def setUp(self, cls):
96     _ThreadedTestCase.setUp(self)
97     self.lock = threading.Lock()
98     self.cond = cls(self.lock)
99
100   def _testAcquireRelease(self):
101     self.assertFalse(self.cond._is_owned())
102     self.assertRaises(RuntimeError, self.cond.wait, None)
103     self.assertRaises(RuntimeError, self.cond.notifyAll)
104
105     self.cond.acquire()
106     self.assert_(self.cond._is_owned())
107     self.cond.notifyAll()
108     self.assert_(self.cond._is_owned())
109     self.cond.release()
110
111     self.assertFalse(self.cond._is_owned())
112     self.assertRaises(RuntimeError, self.cond.wait, None)
113     self.assertRaises(RuntimeError, self.cond.notifyAll)
114
115   def _testNotification(self):
116     def _NotifyAll():
117       self.done.put("NE")
118       self.cond.acquire()
119       self.done.put("NA")
120       self.cond.notifyAll()
121       self.done.put("NN")
122       self.cond.release()
123
124     self.cond.acquire()
125     self._addThread(target=_NotifyAll)
126     self.assertEqual(self.done.get(True, 1), "NE")
127     self.assertRaises(Queue.Empty, self.done.get_nowait)
128     self.cond.wait(None)
129     self.assertEqual(self.done.get(True, 1), "NA")
130     self.assertEqual(self.done.get(True, 1), "NN")
131     self.assert_(self.cond._is_owned())
132     self.cond.release()
133     self.assertFalse(self.cond._is_owned())
134
135
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137   """SingleNotifyPipeCondition tests"""
138
139   def setUp(self):
140     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141
142   def testAcquireRelease(self):
143     self._testAcquireRelease()
144
145   def testNotification(self):
146     self._testNotification()
147
148   def testWaitReuse(self):
149     self.cond.acquire()
150     self.cond.wait(0)
151     self.cond.wait(0.1)
152     self.cond.release()
153
154   def testNoNotifyReuse(self):
155     self.cond.acquire()
156     self.cond.notifyAll()
157     self.assertRaises(RuntimeError, self.cond.wait, None)
158     self.assertRaises(RuntimeError, self.cond.notifyAll)
159     self.cond.release()
160
161
162 class TestPipeCondition(_ConditionTestCase):
163   """PipeCondition tests"""
164
165   def setUp(self):
166     _ConditionTestCase.setUp(self, locking.PipeCondition)
167
168   def testAcquireRelease(self):
169     self._testAcquireRelease()
170
171   def testNotification(self):
172     self._testNotification()
173
174   def _TestWait(self, fn):
175     threads = [
176       self._addThread(target=fn),
177       self._addThread(target=fn),
178       self._addThread(target=fn),
179       ]
180
181     # Wait for threads to be waiting
182     for _ in threads:
183       self.assertEqual(self.done.get(True, 1), "A")
184
185     self.assertRaises(Queue.Empty, self.done.get_nowait)
186
187     self.cond.acquire()
188     self.assertEqual(len(self.cond._waiters), 3)
189     self.assertEqual(self.cond._waiters, set(threads))
190     # This new thread can't acquire the lock, and thus call wait, before we
191     # release it
192     self._addThread(target=fn)
193     self.cond.notifyAll()
194     self.assertRaises(Queue.Empty, self.done.get_nowait)
195     self.cond.release()
196
197     # We should now get 3 W and 1 A (for the new thread) in whatever order
198     w = 0
199     a = 0
200     for i in range(4):
201       got = self.done.get(True, 1)
202       if got == "W":
203         w += 1
204       elif got == "A":
205         a += 1
206       else:
207         self.fail("Got %s on the done queue" % got)
208
209     self.assertEqual(w, 3)
210     self.assertEqual(a, 1)
211
212     self.cond.acquire()
213     self.cond.notifyAll()
214     self.cond.release()
215     self._waitThreads()
216     self.assertEqual(self.done.get_nowait(), "W")
217     self.assertRaises(Queue.Empty, self.done.get_nowait)
218
219   def testBlockingWait(self):
220     def _BlockingWait():
221       self.cond.acquire()
222       self.done.put("A")
223       self.cond.wait(None)
224       self.cond.release()
225       self.done.put("W")
226
227     self._TestWait(_BlockingWait)
228
229   def testLongTimeoutWait(self):
230     def _Helper():
231       self.cond.acquire()
232       self.done.put("A")
233       self.cond.wait(15.0)
234       self.cond.release()
235       self.done.put("W")
236
237     self._TestWait(_Helper)
238
239   def _TimeoutWait(self, timeout, check):
240     self.cond.acquire()
241     self.cond.wait(timeout)
242     self.cond.release()
243     self.done.put(check)
244
245   def testShortTimeoutWait(self):
246     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248     self._waitThreads()
249     self.assertEqual(self.done.get_nowait(), "T1")
250     self.assertEqual(self.done.get_nowait(), "T1")
251     self.assertRaises(Queue.Empty, self.done.get_nowait)
252
253   def testZeroTimeoutWait(self):
254     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257     self._waitThreads()
258     self.assertEqual(self.done.get_nowait(), "T0")
259     self.assertEqual(self.done.get_nowait(), "T0")
260     self.assertEqual(self.done.get_nowait(), "T0")
261     self.assertRaises(Queue.Empty, self.done.get_nowait)
262
263
264 class TestSharedLock(_ThreadedTestCase):
265   """SharedLock tests"""
266
267   def setUp(self):
268     _ThreadedTestCase.setUp(self)
269     self.sl = locking.SharedLock("TestSharedLock")
270
271   def testSequenceAndOwnership(self):
272     self.assertFalse(self.sl._is_owned())
273     self.sl.acquire(shared=1)
274     self.assert_(self.sl._is_owned())
275     self.assert_(self.sl._is_owned(shared=1))
276     self.assertFalse(self.sl._is_owned(shared=0))
277     self.sl.release()
278     self.assertFalse(self.sl._is_owned())
279     self.sl.acquire()
280     self.assert_(self.sl._is_owned())
281     self.assertFalse(self.sl._is_owned(shared=1))
282     self.assert_(self.sl._is_owned(shared=0))
283     self.sl.release()
284     self.assertFalse(self.sl._is_owned())
285     self.sl.acquire(shared=1)
286     self.assert_(self.sl._is_owned())
287     self.assert_(self.sl._is_owned(shared=1))
288     self.assertFalse(self.sl._is_owned(shared=0))
289     self.sl.release()
290     self.assertFalse(self.sl._is_owned())
291
292   def testBooleanValue(self):
293     # semaphores are supposed to return a true value on a successful acquire
294     self.assert_(self.sl.acquire(shared=1))
295     self.sl.release()
296     self.assert_(self.sl.acquire())
297     self.sl.release()
298
299   def testDoubleLockingStoE(self):
300     self.sl.acquire(shared=1)
301     self.assertRaises(AssertionError, self.sl.acquire)
302
303   def testDoubleLockingEtoS(self):
304     self.sl.acquire()
305     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306
307   def testDoubleLockingStoS(self):
308     self.sl.acquire(shared=1)
309     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310
311   def testDoubleLockingEtoE(self):
312     self.sl.acquire()
313     self.assertRaises(AssertionError, self.sl.acquire)
314
315   # helper functions: called in a separate thread they acquire the lock, send
316   # their identifier on the done queue, then release it.
317   def _doItSharer(self):
318     try:
319       self.sl.acquire(shared=1)
320       self.done.put('SHR')
321       self.sl.release()
322     except errors.LockError:
323       self.done.put('ERR')
324
325   def _doItExclusive(self):
326     try:
327       self.sl.acquire()
328       self.done.put('EXC')
329       self.sl.release()
330     except errors.LockError:
331       self.done.put('ERR')
332
333   def _doItDelete(self):
334     try:
335       self.sl.delete()
336       self.done.put('DEL')
337     except errors.LockError:
338       self.done.put('ERR')
339
340   def testSharersCanCoexist(self):
341     self.sl.acquire(shared=1)
342     threading.Thread(target=self._doItSharer).start()
343     self.assert_(self.done.get(True, 1))
344     self.sl.release()
345
346   @_Repeat
347   def testExclusiveBlocksExclusive(self):
348     self.sl.acquire()
349     self._addThread(target=self._doItExclusive)
350     self.assertRaises(Queue.Empty, self.done.get_nowait)
351     self.sl.release()
352     self._waitThreads()
353     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354
355   @_Repeat
356   def testExclusiveBlocksDelete(self):
357     self.sl.acquire()
358     self._addThread(target=self._doItDelete)
359     self.assertRaises(Queue.Empty, self.done.get_nowait)
360     self.sl.release()
361     self._waitThreads()
362     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363     self.sl = locking.SharedLock(self.sl.name)
364
365   @_Repeat
366   def testExclusiveBlocksSharer(self):
367     self.sl.acquire()
368     self._addThread(target=self._doItSharer)
369     self.assertRaises(Queue.Empty, self.done.get_nowait)
370     self.sl.release()
371     self._waitThreads()
372     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373
374   @_Repeat
375   def testSharerBlocksExclusive(self):
376     self.sl.acquire(shared=1)
377     self._addThread(target=self._doItExclusive)
378     self.assertRaises(Queue.Empty, self.done.get_nowait)
379     self.sl.release()
380     self._waitThreads()
381     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382
383   @_Repeat
384   def testSharerBlocksDelete(self):
385     self.sl.acquire(shared=1)
386     self._addThread(target=self._doItDelete)
387     self.assertRaises(Queue.Empty, self.done.get_nowait)
388     self.sl.release()
389     self._waitThreads()
390     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391     self.sl = locking.SharedLock(self.sl.name)
392
393   @_Repeat
394   def testWaitingExclusiveBlocksSharer(self):
395     """SKIPPED testWaitingExclusiveBlockSharer"""
396     return
397
398     self.sl.acquire(shared=1)
399     # the lock is acquired in shared mode...
400     self._addThread(target=self._doItExclusive)
401     # ...but now an exclusive is waiting...
402     self._addThread(target=self._doItSharer)
403     # ...so the sharer should be blocked as well
404     self.assertRaises(Queue.Empty, self.done.get_nowait)
405     self.sl.release()
406     self._waitThreads()
407     # The exclusive passed before
408     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410
411   @_Repeat
412   def testWaitingSharerBlocksExclusive(self):
413     """SKIPPED testWaitingSharerBlocksExclusive"""
414     return
415
416     self.sl.acquire()
417     # the lock is acquired in exclusive mode...
418     self._addThread(target=self._doItSharer)
419     # ...but now a sharer is waiting...
420     self._addThread(target=self._doItExclusive)
421     # ...the exclusive is waiting too...
422     self.assertRaises(Queue.Empty, self.done.get_nowait)
423     self.sl.release()
424     self._waitThreads()
425     # The sharer passed before
426     self.assertEqual(self.done.get_nowait(), 'SHR')
427     self.assertEqual(self.done.get_nowait(), 'EXC')
428
429   def testDelete(self):
430     self.sl.delete()
431     self.assertRaises(errors.LockError, self.sl.acquire)
432     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433     self.assertRaises(errors.LockError, self.sl.delete)
434
435   def testDeleteTimeout(self):
436     self.sl.delete(timeout=60)
437
438   def testNoDeleteIfSharer(self):
439     self.sl.acquire(shared=1)
440     self.assertRaises(AssertionError, self.sl.delete)
441
442   @_Repeat
443   def testDeletePendingSharersExclusiveDelete(self):
444     self.sl.acquire()
445     self._addThread(target=self._doItSharer)
446     self._addThread(target=self._doItSharer)
447     self._addThread(target=self._doItExclusive)
448     self._addThread(target=self._doItDelete)
449     self.sl.delete()
450     self._waitThreads()
451     # The threads who were pending return ERR
452     for _ in range(4):
453       self.assertEqual(self.done.get_nowait(), 'ERR')
454     self.sl = locking.SharedLock(self.sl.name)
455
456   @_Repeat
457   def testDeletePendingDeleteExclusiveSharers(self):
458     self.sl.acquire()
459     self._addThread(target=self._doItDelete)
460     self._addThread(target=self._doItExclusive)
461     self._addThread(target=self._doItSharer)
462     self._addThread(target=self._doItSharer)
463     self.sl.delete()
464     self._waitThreads()
465     # The two threads who were pending return both ERR
466     self.assertEqual(self.done.get_nowait(), 'ERR')
467     self.assertEqual(self.done.get_nowait(), 'ERR')
468     self.assertEqual(self.done.get_nowait(), 'ERR')
469     self.assertEqual(self.done.get_nowait(), 'ERR')
470     self.sl = locking.SharedLock(self.sl.name)
471
472   @_Repeat
473   def testExclusiveAcquireTimeout(self):
474     for shared in [0, 1]:
475       on_queue = threading.Event()
476       release_exclusive = threading.Event()
477
478       def _LockExclusive():
479         self.sl.acquire(shared=0, test_notify=on_queue.set)
480         self.done.put("A: start wait")
481         release_exclusive.wait()
482         self.done.put("A: end wait")
483         self.sl.release()
484
485       # Start thread to hold lock in exclusive mode
486       self._addThread(target=_LockExclusive)
487
488       # Wait for wait to begin
489       self.assertEqual(self.done.get(timeout=60), "A: start wait")
490
491       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
492       # on the queue
493       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494                                       test_notify=release_exclusive.set))
495
496       self.done.put("got 2nd")
497       self.sl.release()
498
499       self._waitThreads()
500
501       self.assertEqual(self.done.get_nowait(), "A: end wait")
502       self.assertEqual(self.done.get_nowait(), "got 2nd")
503       self.assertRaises(Queue.Empty, self.done.get_nowait)
504
505   @_Repeat
506   def testAcquireExpiringTimeout(self):
507     def _AcquireWithTimeout(shared, timeout):
508       if not self.sl.acquire(shared=shared, timeout=timeout):
509         self.done.put("timeout")
510
511     for shared in [0, 1]:
512       # Lock exclusively
513       self.sl.acquire()
514
515       # Start shared acquires with timeout between 0 and 20 ms
516       for i in range(11):
517         self._addThread(target=_AcquireWithTimeout,
518                         args=(shared, i * 2.0 / 1000.0))
519
520       # Wait for threads to finish (makes sure the acquire timeout expires
521       # before releasing the lock)
522       self._waitThreads()
523
524       # Release lock
525       self.sl.release()
526
527       for _ in range(11):
528         self.assertEqual(self.done.get_nowait(), "timeout")
529
530       self.assertRaises(Queue.Empty, self.done.get_nowait)
531
532   @_Repeat
533   def testSharedSkipExclusiveAcquires(self):
534     # Tests whether shared acquires jump in front of exclusive acquires in the
535     # queue.
536
537     def _Acquire(shared, name, notify_ev, wait_ev):
538       if notify_ev:
539         notify_fn = notify_ev.set
540       else:
541         notify_fn = None
542
543       if wait_ev:
544         wait_ev.wait()
545
546       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
547         return
548
549       self.done.put(name)
550       self.sl.release()
551
552     # Get exclusive lock while we fill the queue
553     self.sl.acquire()
554
555     shrcnt1 = 5
556     shrcnt2 = 7
557     shrcnt3 = 9
558     shrcnt4 = 2
559
560     # Add acquires using threading.Event for synchronization. They'll be
561     # acquired exactly in the order defined in this list.
562     acquires = (shrcnt1 * [(1, "shared 1")] +
563                 3 * [(0, "exclusive 1")] +
564                 shrcnt2 * [(1, "shared 2")] +
565                 shrcnt3 * [(1, "shared 3")] +
566                 shrcnt4 * [(1, "shared 4")] +
567                 3 * [(0, "exclusive 2")])
568
569     ev_cur = None
570     ev_prev = None
571
572     for args in acquires:
573       ev_cur = threading.Event()
574       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
575       ev_prev = ev_cur
576
577     # Wait for last acquire to start
578     ev_prev.wait()
579
580     # Expect 6 pending exclusive acquires and 1 for all shared acquires
581     # together
582     self.assertEqual(self.sl._count_pending(), 7)
583
584     # Release exclusive lock and wait
585     self.sl.release()
586
587     self._waitThreads()
588
589     # Check sequence
590     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591       # Shared locks aren't guaranteed to be notified in order, but they'll be
592       # first
593       tmp = self.done.get_nowait()
594       if tmp == "shared 1":
595         shrcnt1 -= 1
596       elif tmp == "shared 2":
597         shrcnt2 -= 1
598       elif tmp == "shared 3":
599         shrcnt3 -= 1
600       elif tmp == "shared 4":
601         shrcnt4 -= 1
602     self.assertEqual(shrcnt1, 0)
603     self.assertEqual(shrcnt2, 0)
604     self.assertEqual(shrcnt3, 0)
605     self.assertEqual(shrcnt3, 0)
606
607     for _ in range(3):
608       self.assertEqual(self.done.get_nowait(), "exclusive 1")
609
610     for _ in range(3):
611       self.assertEqual(self.done.get_nowait(), "exclusive 2")
612
613     self.assertRaises(Queue.Empty, self.done.get_nowait)
614
615   def testIllegalDowngrade(self):
616     # Not yet acquired
617     self.assertRaises(AssertionError, self.sl.downgrade)
618
619     # Acquire in shared mode, downgrade should be no-op
620     self.assertTrue(self.sl.acquire(shared=1))
621     self.assertTrue(self.sl._is_owned(shared=1))
622     self.assertTrue(self.sl.downgrade())
623     self.assertTrue(self.sl._is_owned(shared=1))
624     self.sl.release()
625
626   def testDowngrade(self):
627     self.assertTrue(self.sl.acquire())
628     self.assertTrue(self.sl._is_owned(shared=0))
629     self.assertTrue(self.sl.downgrade())
630     self.assertTrue(self.sl._is_owned(shared=1))
631     self.sl.release()
632
633   @_Repeat
634   def testDowngradeJumpsAheadOfExclusive(self):
635     def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636       self.assertTrue(self.sl.acquire())
637       self.assertTrue(self.sl._is_owned(shared=0))
638       ev_got.set()
639       ev_downgrade.wait()
640       self.assertTrue(self.sl._is_owned(shared=0))
641       self.assertTrue(self.sl.downgrade())
642       self.assertTrue(self.sl._is_owned(shared=1))
643       ev_release.wait()
644       self.assertTrue(self.sl._is_owned(shared=1))
645       self.sl.release()
646
647     def _KeepExclusive2(ev_started, ev_release):
648       self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649       self.assertTrue(self.sl._is_owned(shared=0))
650       ev_release.wait()
651       self.assertTrue(self.sl._is_owned(shared=0))
652       self.sl.release()
653
654     def _KeepShared(ev_started, ev_got, ev_release):
655       self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656       self.assertTrue(self.sl._is_owned(shared=1))
657       ev_got.set()
658       ev_release.wait()
659       self.assertTrue(self.sl._is_owned(shared=1))
660       self.sl.release()
661
662     # Acquire lock in exclusive mode
663     ev_got_excl1 = threading.Event()
664     ev_downgrade_excl1 = threading.Event()
665     ev_release_excl1 = threading.Event()
666     th_excl1 = self._addThread(target=_KeepExclusive,
667                                args=(ev_got_excl1, ev_downgrade_excl1,
668                                      ev_release_excl1))
669     ev_got_excl1.wait()
670
671     # Start a second exclusive acquire
672     ev_started_excl2 = threading.Event()
673     ev_release_excl2 = threading.Event()
674     th_excl2 = self._addThread(target=_KeepExclusive2,
675                                args=(ev_started_excl2, ev_release_excl2))
676     ev_started_excl2.wait()
677
678     # Start shared acquires, will jump ahead of second exclusive acquire when
679     # first exclusive acquire downgrades
680     ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681     ev_release_shared = threading.Event()
682
683     th_shared = [self._addThread(target=_KeepShared,
684                                  args=(ev_started, ev_got, ev_release_shared))
685                  for (ev_started, ev_got) in ev_shared]
686
687     # Wait for all shared acquires to start
688     for (ev, _) in ev_shared:
689       ev.wait()
690
691     # Check lock information
692     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693                      [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
694     [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
695     self.assertEqual([(pendmode, sorted(waiting))
696                       for (pendmode, waiting) in pending],
697                      [("exclusive", [th_excl2.getName()]),
698                       ("shared", sorted(th.getName() for th in th_shared))])
699
700     # Shared acquires won't start until the exclusive lock is downgraded
701     ev_downgrade_excl1.set()
702
703     # Wait for all shared acquires to be successful
704     for (_, ev) in ev_shared:
705       ev.wait()
706
707     # Check lock information again
708     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
709                                               query.LQ_PENDING])),
710                      [(self.sl.name, "shared", None,
711                        [("exclusive", [th_excl2.getName()])])])
712     [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
713     self.assertEqual(set(owner), set([th_excl1.getName()] +
714                                      [th.getName() for th in th_shared]))
715
716     ev_release_excl1.set()
717     ev_release_excl2.set()
718     ev_release_shared.set()
719
720     self._waitThreads()
721
722     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
723                                               query.LQ_PENDING])),
724                      [(self.sl.name, None, None, [])])
725
726   @_Repeat
727   def testMixedAcquireTimeout(self):
728     sync = threading.Event()
729
730     def _AcquireShared(ev):
731       if not self.sl.acquire(shared=1, timeout=None):
732         return
733
734       self.done.put("shared")
735
736       # Notify main thread
737       ev.set()
738
739       # Wait for notification from main thread
740       sync.wait()
741
742       # Release lock
743       self.sl.release()
744
745     acquires = []
746     for _ in range(3):
747       ev = threading.Event()
748       self._addThread(target=_AcquireShared, args=(ev, ))
749       acquires.append(ev)
750
751     # Wait for all acquires to finish
752     for i in acquires:
753       i.wait()
754
755     self.assertEqual(self.sl._count_pending(), 0)
756
757     # Try to get exclusive lock
758     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
759
760     # Acquire exclusive without timeout
761     exclsync = threading.Event()
762     exclev = threading.Event()
763
764     def _AcquireExclusive():
765       if not self.sl.acquire(shared=0):
766         return
767
768       self.done.put("exclusive")
769
770       # Notify main thread
771       exclev.set()
772
773       # Wait for notification from main thread
774       exclsync.wait()
775
776       self.sl.release()
777
778     self._addThread(target=_AcquireExclusive)
779
780     # Try to get exclusive lock
781     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
782
783     # Make all shared holders release their locks
784     sync.set()
785
786     # Wait for exclusive acquire to succeed
787     exclev.wait()
788
789     self.assertEqual(self.sl._count_pending(), 0)
790
791     # Try to get exclusive lock
792     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
793
794     def _AcquireSharedSimple():
795       if self.sl.acquire(shared=1, timeout=None):
796         self.done.put("shared2")
797         self.sl.release()
798
799     for _ in range(10):
800       self._addThread(target=_AcquireSharedSimple)
801
802     # Tell exclusive lock to release
803     exclsync.set()
804
805     # Wait for everything to finish
806     self._waitThreads()
807
808     self.assertEqual(self.sl._count_pending(), 0)
809
810     # Check sequence
811     for _ in range(3):
812       self.assertEqual(self.done.get_nowait(), "shared")
813
814     self.assertEqual(self.done.get_nowait(), "exclusive")
815
816     for _ in range(10):
817       self.assertEqual(self.done.get_nowait(), "shared2")
818
819     self.assertRaises(Queue.Empty, self.done.get_nowait)
820
821   def testPriority(self):
822     # Acquire in exclusive mode
823     self.assert_(self.sl.acquire(shared=0))
824
825     # Queue acquires
826     def _Acquire(prev, next, shared, priority, result):
827       prev.wait()
828       self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
829       try:
830         self.done.put(result)
831       finally:
832         self.sl.release()
833
834     counter = itertools.count(0)
835     priorities = range(-20, 30)
836     first = threading.Event()
837     prev = first
838
839     # Data structure:
840     # {
841     #   priority:
842     #     [(shared/exclusive, set(acquire names), set(pending threads)),
843     #      (shared/exclusive, ...),
844     #      ...,
845     #     ],
846     # }
847     perprio = {}
848
849     # References shared acquire per priority in L{perprio}. Data structure:
850     # {
851     #   priority: (shared=1, set(acquire names), set(pending threads)),
852     # }
853     prioshared = {}
854
855     for seed in [4979, 9523, 14902, 32440]:
856       # Use a deterministic random generator
857       rnd = random.Random(seed)
858       for priority in [rnd.choice(priorities) for _ in range(30)]:
859         modes = [0, 1]
860         rnd.shuffle(modes)
861         for shared in modes:
862           # Unique name
863           acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
864
865           ev = threading.Event()
866           thread = self._addThread(target=_Acquire,
867                                    args=(prev, ev, shared, priority, acqname))
868           prev = ev
869
870           # Record expected aqcuire, see above for structure
871           data = (shared, set([acqname]), set([thread]))
872           priolist = perprio.setdefault(priority, [])
873           if shared:
874             priosh = prioshared.get(priority, None)
875             if priosh:
876               # Shared acquires are merged
877               for i, j in zip(priosh[1:], data[1:]):
878                 i.update(j)
879               assert data[0] == priosh[0]
880             else:
881               prioshared[priority] = data
882               priolist.append(data)
883           else:
884             priolist.append(data)
885
886     # Start all acquires and wait for them
887     first.set()
888     prev.wait()
889
890     # Check lock information
891     self.assertEqual(self.sl.GetLockInfo(set()),
892                      [(self.sl.name, None, None, None)])
893     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
894                      [(self.sl.name, "exclusive",
895                        [threading.currentThread().getName()], None)])
896
897     self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
898                             perprio)
899
900     # Let threads acquire the lock
901     self.sl.release()
902
903     # Wait for everything to finish
904     self._waitThreads()
905
906     self.assert_(self.sl._check_empty())
907
908     # Check acquires by priority
909     for acquires in [perprio[i] for i in sorted(perprio.keys())]:
910       for (_, names, _) in acquires:
911         # For shared acquires, the set will contain 1..n entries. For exclusive
912         # acquires only one.
913         while names:
914           names.remove(self.done.get_nowait())
915       self.assertFalse(compat.any(names for (_, names, _) in acquires))
916
917     self.assertRaises(Queue.Empty, self.done.get_nowait)
918
919   def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
920     self.assertEqual(name, self.sl.name)
921     self.assert_(mode is None)
922     self.assert_(owner is None)
923
924     self.assertEqual([(pendmode, sorted(waiting))
925                       for (pendmode, waiting) in pending],
926                      [(["exclusive", "shared"][int(bool(shared))],
927                        sorted(t.getName() for t in threads))
928                       for acquires in [perprio[i]
929                                        for i in sorted(perprio.keys())]
930                       for (shared, _, threads) in acquires])
931
932
933 class TestSharedLockInCondition(_ThreadedTestCase):
934   """SharedLock as a condition lock tests"""
935
936   def setUp(self):
937     _ThreadedTestCase.setUp(self)
938     self.sl = locking.SharedLock("TestSharedLockInCondition")
939     self.setCondition()
940
941   def setCondition(self):
942     self.cond = threading.Condition(self.sl)
943
944   def testKeepMode(self):
945     self.cond.acquire(shared=1)
946     self.assert_(self.sl._is_owned(shared=1))
947     self.cond.wait(0)
948     self.assert_(self.sl._is_owned(shared=1))
949     self.cond.release()
950     self.cond.acquire(shared=0)
951     self.assert_(self.sl._is_owned(shared=0))
952     self.cond.wait(0)
953     self.assert_(self.sl._is_owned(shared=0))
954     self.cond.release()
955
956
957 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
958   """SharedLock as a pipe condition lock tests"""
959
960   def setCondition(self):
961     self.cond = locking.PipeCondition(self.sl)
962
963
964 class TestSSynchronizedDecorator(_ThreadedTestCase):
965   """Shared Lock Synchronized decorator test"""
966
967   def setUp(self):
968     _ThreadedTestCase.setUp(self)
969
970   @locking.ssynchronized(_decoratorlock)
971   def _doItExclusive(self):
972     self.assert_(_decoratorlock._is_owned())
973     self.done.put('EXC')
974
975   @locking.ssynchronized(_decoratorlock, shared=1)
976   def _doItSharer(self):
977     self.assert_(_decoratorlock._is_owned(shared=1))
978     self.done.put('SHR')
979
980   def testDecoratedFunctions(self):
981     self._doItExclusive()
982     self.assertFalse(_decoratorlock._is_owned())
983     self._doItSharer()
984     self.assertFalse(_decoratorlock._is_owned())
985
986   def testSharersCanCoexist(self):
987     _decoratorlock.acquire(shared=1)
988     threading.Thread(target=self._doItSharer).start()
989     self.assert_(self.done.get(True, 1))
990     _decoratorlock.release()
991
992   @_Repeat
993   def testExclusiveBlocksExclusive(self):
994     _decoratorlock.acquire()
995     self._addThread(target=self._doItExclusive)
996     # give it a bit of time to check that it's not actually doing anything
997     self.assertRaises(Queue.Empty, self.done.get_nowait)
998     _decoratorlock.release()
999     self._waitThreads()
1000     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1001
1002   @_Repeat
1003   def testExclusiveBlocksSharer(self):
1004     _decoratorlock.acquire()
1005     self._addThread(target=self._doItSharer)
1006     self.assertRaises(Queue.Empty, self.done.get_nowait)
1007     _decoratorlock.release()
1008     self._waitThreads()
1009     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1010
1011   @_Repeat
1012   def testSharerBlocksExclusive(self):
1013     _decoratorlock.acquire(shared=1)
1014     self._addThread(target=self._doItExclusive)
1015     self.assertRaises(Queue.Empty, self.done.get_nowait)
1016     _decoratorlock.release()
1017     self._waitThreads()
1018     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1019
1020
1021 class TestLockSet(_ThreadedTestCase):
1022   """LockSet tests"""
1023
1024   def setUp(self):
1025     _ThreadedTestCase.setUp(self)
1026     self._setUpLS()
1027
1028   def _setUpLS(self):
1029     """Helper to (re)initialize the lock set"""
1030     self.resources = ['one', 'two', 'three']
1031     self.ls = locking.LockSet(self.resources, "TestLockSet")
1032
1033   def testResources(self):
1034     self.assertEquals(self.ls._names(), set(self.resources))
1035     newls = locking.LockSet([], "TestLockSet.testResources")
1036     self.assertEquals(newls._names(), set())
1037
1038   def testAcquireRelease(self):
1039     self.assert_(self.ls.acquire('one'))
1040     self.assertEquals(self.ls._list_owned(), set(['one']))
1041     self.ls.release()
1042     self.assertEquals(self.ls._list_owned(), set())
1043     self.assertEquals(self.ls.acquire(['one']), set(['one']))
1044     self.assertEquals(self.ls._list_owned(), set(['one']))
1045     self.ls.release()
1046     self.assertEquals(self.ls._list_owned(), set())
1047     self.ls.acquire(['one', 'two', 'three'])
1048     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1049     self.ls.release('one')
1050     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
1051     self.ls.release(['three'])
1052     self.assertEquals(self.ls._list_owned(), set(['two']))
1053     self.ls.release()
1054     self.assertEquals(self.ls._list_owned(), set())
1055     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1056     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
1057     self.ls.release()
1058     self.assertEquals(self.ls._list_owned(), set())
1059
1060   def testNoDoubleAcquire(self):
1061     self.ls.acquire('one')
1062     self.assertRaises(AssertionError, self.ls.acquire, 'one')
1063     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1064     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1065     self.ls.release()
1066     self.ls.acquire(['one', 'three'])
1067     self.ls.release('one')
1068     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1069     self.ls.release('three')
1070
1071   def testNoWrongRelease(self):
1072     self.assertRaises(AssertionError, self.ls.release)
1073     self.ls.acquire('one')
1074     self.assertRaises(AssertionError, self.ls.release, 'two')
1075
1076   def testAddRemove(self):
1077     self.ls.add('four')
1078     self.assertEquals(self.ls._list_owned(), set())
1079     self.assert_('four' in self.ls._names())
1080     self.ls.add(['five', 'six', 'seven'], acquired=1)
1081     self.assert_('five' in self.ls._names())
1082     self.assert_('six' in self.ls._names())
1083     self.assert_('seven' in self.ls._names())
1084     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
1085     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1086     self.assert_('five' not in self.ls._names())
1087     self.assert_('six' not in self.ls._names())
1088     self.assertEquals(self.ls._list_owned(), set(['seven']))
1089     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1090     self.ls.remove('seven')
1091     self.assert_('seven' not in self.ls._names())
1092     self.assertEquals(self.ls._list_owned(), set([]))
1093     self.ls.acquire(None, shared=1)
1094     self.assertRaises(AssertionError, self.ls.add, 'eight')
1095     self.ls.release()
1096     self.ls.acquire(None)
1097     self.ls.add('eight', acquired=1)
1098     self.assert_('eight' in self.ls._names())
1099     self.assert_('eight' in self.ls._list_owned())
1100     self.ls.add('nine')
1101     self.assert_('nine' in self.ls._names())
1102     self.assert_('nine' not in self.ls._list_owned())
1103     self.ls.release()
1104     self.ls.remove(['two'])
1105     self.assert_('two' not in self.ls._names())
1106     self.ls.acquire('three')
1107     self.assertEquals(self.ls.remove(['three']), ['three'])
1108     self.assert_('three' not in self.ls._names())
1109     self.assertEquals(self.ls.remove('three'), [])
1110     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1111     self.assert_('one' not in self.ls._names())
1112
1113   def testRemoveNonBlocking(self):
1114     self.ls.acquire('one')
1115     self.assertEquals(self.ls.remove('one'), ['one'])
1116     self.ls.acquire(['two', 'three'])
1117     self.assertEquals(self.ls.remove(['two', 'three']),
1118                       ['two', 'three'])
1119
1120   def testNoDoubleAdd(self):
1121     self.assertRaises(errors.LockError, self.ls.add, 'two')
1122     self.ls.add('four')
1123     self.assertRaises(errors.LockError, self.ls.add, 'four')
1124
1125   def testNoWrongRemoves(self):
1126     self.ls.acquire(['one', 'three'], shared=1)
1127     # Cannot remove 'two' while holding something which is not a superset
1128     self.assertRaises(AssertionError, self.ls.remove, 'two')
1129     # Cannot remove 'three' as we are sharing it
1130     self.assertRaises(AssertionError, self.ls.remove, 'three')
1131
1132   def testAcquireSetLock(self):
1133     # acquire the set-lock exclusively
1134     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1135     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1136     self.assertEquals(self.ls._is_owned(), True)
1137     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1138     # I can still add/remove elements...
1139     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1140     self.assert_(self.ls.add('six'))
1141     self.ls.release()
1142     # share the set-lock
1143     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1144     # adding new elements is not possible
1145     self.assertRaises(AssertionError, self.ls.add, 'five')
1146     self.ls.release()
1147
1148   def testAcquireWithRepetitions(self):
1149     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1150                       set(['two', 'two', 'three']))
1151     self.ls.release(['two', 'two'])
1152     self.assertEquals(self.ls._list_owned(), set(['three']))
1153
1154   def testEmptyAcquire(self):
1155     # Acquire an empty list of locks...
1156     self.assertEquals(self.ls.acquire([]), set())
1157     self.assertEquals(self.ls._list_owned(), set())
1158     # New locks can still be addded
1159     self.assert_(self.ls.add('six'))
1160     # "re-acquiring" is not an issue, since we had really acquired nothing
1161     self.assertEquals(self.ls.acquire([], shared=1), set())
1162     self.assertEquals(self.ls._list_owned(), set())
1163     # We haven't really acquired anything, so we cannot release
1164     self.assertRaises(AssertionError, self.ls.release)
1165
1166   def _doLockSet(self, names, shared):
1167     try:
1168       self.ls.acquire(names, shared=shared)
1169       self.done.put('DONE')
1170       self.ls.release()
1171     except errors.LockError:
1172       self.done.put('ERR')
1173
1174   def _doAddSet(self, names):
1175     try:
1176       self.ls.add(names, acquired=1)
1177       self.done.put('DONE')
1178       self.ls.release()
1179     except errors.LockError:
1180       self.done.put('ERR')
1181
1182   def _doRemoveSet(self, names):
1183     self.done.put(self.ls.remove(names))
1184
1185   @_Repeat
1186   def testConcurrentSharedAcquire(self):
1187     self.ls.acquire(['one', 'two'], shared=1)
1188     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1189     self._waitThreads()
1190     self.assertEqual(self.done.get_nowait(), 'DONE')
1191     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1192     self._waitThreads()
1193     self.assertEqual(self.done.get_nowait(), 'DONE')
1194     self._addThread(target=self._doLockSet, args=('three', 1))
1195     self._waitThreads()
1196     self.assertEqual(self.done.get_nowait(), 'DONE')
1197     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1198     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1199     self.assertRaises(Queue.Empty, self.done.get_nowait)
1200     self.ls.release()
1201     self._waitThreads()
1202     self.assertEqual(self.done.get_nowait(), 'DONE')
1203     self.assertEqual(self.done.get_nowait(), 'DONE')
1204
1205   @_Repeat
1206   def testConcurrentExclusiveAcquire(self):
1207     self.ls.acquire(['one', 'two'])
1208     self._addThread(target=self._doLockSet, args=('three', 1))
1209     self._waitThreads()
1210     self.assertEqual(self.done.get_nowait(), 'DONE')
1211     self._addThread(target=self._doLockSet, args=('three', 0))
1212     self._waitThreads()
1213     self.assertEqual(self.done.get_nowait(), 'DONE')
1214     self.assertRaises(Queue.Empty, self.done.get_nowait)
1215     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1216     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1217     self._addThread(target=self._doLockSet, args=('one', 0))
1218     self._addThread(target=self._doLockSet, args=('one', 1))
1219     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1220     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1221     self.assertRaises(Queue.Empty, self.done.get_nowait)
1222     self.ls.release()
1223     self._waitThreads()
1224     for _ in range(6):
1225       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1226
1227   @_Repeat
1228   def testSimpleAcquireTimeoutExpiring(self):
1229     names = sorted(self.ls._names())
1230     self.assert_(len(names) >= 3)
1231
1232     # Get name of first lock
1233     first = names[0]
1234
1235     # Get name of last lock
1236     last = names.pop()
1237
1238     checks = [
1239       # Block first and try to lock it again
1240       (first, first),
1241
1242       # Block last and try to lock all locks
1243       (None, first),
1244
1245       # Block last and try to lock it again
1246       (last, last),
1247       ]
1248
1249     for (wanted, block) in checks:
1250       # Lock in exclusive mode
1251       self.assert_(self.ls.acquire(block, shared=0))
1252
1253       def _AcquireOne():
1254         # Try to get the same lock again with a timeout (should never succeed)
1255         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1256         if acquired:
1257           self.done.put("acquired")
1258           self.ls.release()
1259         else:
1260           self.assert_(acquired is None)
1261           self.assertFalse(self.ls._list_owned())
1262           self.assertFalse(self.ls._is_owned())
1263           self.done.put("not acquired")
1264
1265       self._addThread(target=_AcquireOne)
1266
1267       # Wait for timeout in thread to expire
1268       self._waitThreads()
1269
1270       # Release exclusive lock again
1271       self.ls.release()
1272
1273       self.assertEqual(self.done.get_nowait(), "not acquired")
1274       self.assertRaises(Queue.Empty, self.done.get_nowait)
1275
1276   @_Repeat
1277   def testDelayedAndExpiringLockAcquire(self):
1278     self._setUpLS()
1279     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1280
1281     for expire in (False, True):
1282       names = sorted(self.ls._names())
1283       self.assertEqual(len(names), 8)
1284
1285       lock_ev = dict([(i, threading.Event()) for i in names])
1286
1287       # Lock all in exclusive mode
1288       self.assert_(self.ls.acquire(names, shared=0))
1289
1290       if expire:
1291         # We'll wait at least 300ms per lock
1292         lockwait = len(names) * [0.3]
1293
1294         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1295         # this gives us up to 2.4s to fail.
1296         lockall_timeout = 0.4
1297       else:
1298         # This should finish rather quickly
1299         lockwait = None
1300         lockall_timeout = len(names) * 5.0
1301
1302       def _LockAll():
1303         def acquire_notification(name):
1304           if not expire:
1305             self.done.put("getting %s" % name)
1306
1307           # Kick next lock
1308           lock_ev[name].set()
1309
1310         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1311                            test_notify=acquire_notification):
1312           self.done.put("got all")
1313           self.ls.release()
1314         else:
1315           self.done.put("timeout on all")
1316
1317         # Notify all locks
1318         for ev in lock_ev.values():
1319           ev.set()
1320
1321       t = self._addThread(target=_LockAll)
1322
1323       for idx, name in enumerate(names):
1324         # Wait for actual acquire on this lock to start
1325         lock_ev[name].wait(10.0)
1326
1327         if expire and t.isAlive():
1328           # Wait some time after getting the notification to make sure the lock
1329           # acquire will expire
1330           SafeSleep(lockwait[idx])
1331
1332         self.ls.release(names=name)
1333
1334       self.assertFalse(self.ls._list_owned())
1335
1336       self._waitThreads()
1337
1338       if expire:
1339         # Not checking which locks were actually acquired. Doing so would be
1340         # too timing-dependant.
1341         self.assertEqual(self.done.get_nowait(), "timeout on all")
1342       else:
1343         for i in names:
1344           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1345         self.assertEqual(self.done.get_nowait(), "got all")
1346       self.assertRaises(Queue.Empty, self.done.get_nowait)
1347
1348   @_Repeat
1349   def testConcurrentRemove(self):
1350     self.ls.add('four')
1351     self.ls.acquire(['one', 'two', 'four'])
1352     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1353     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1354     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1355     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1356     self.assertRaises(Queue.Empty, self.done.get_nowait)
1357     self.ls.remove('one')
1358     self.ls.release()
1359     self._waitThreads()
1360     for i in range(4):
1361       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1362     self.ls.add(['five', 'six'], acquired=1)
1363     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1364     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1365     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1366     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1367     self.ls.remove('five')
1368     self.ls.release()
1369     self._waitThreads()
1370     for i in range(4):
1371       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1372     self.ls.acquire(['three', 'four'])
1373     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1374     self.assertRaises(Queue.Empty, self.done.get_nowait)
1375     self.ls.remove('four')
1376     self._waitThreads()
1377     self.assertEqual(self.done.get_nowait(), ['six'])
1378     self._addThread(target=self._doRemoveSet, args=(['two']))
1379     self._waitThreads()
1380     self.assertEqual(self.done.get_nowait(), ['two'])
1381     self.ls.release()
1382     # reset lockset
1383     self._setUpLS()
1384
1385   @_Repeat
1386   def testConcurrentSharedSetLock(self):
1387     # share the set-lock...
1388     self.ls.acquire(None, shared=1)
1389     # ...another thread can share it too
1390     self._addThread(target=self._doLockSet, args=(None, 1))
1391     self._waitThreads()
1392     self.assertEqual(self.done.get_nowait(), 'DONE')
1393     # ...or just share some elements
1394     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1395     self._waitThreads()
1396     self.assertEqual(self.done.get_nowait(), 'DONE')
1397     # ...but not add new ones or remove any
1398     t = self._addThread(target=self._doAddSet, args=(['nine']))
1399     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1400     self.assertRaises(Queue.Empty, self.done.get_nowait)
1401     # this just releases the set-lock
1402     self.ls.release([])
1403     t.join(60)
1404     self.assertEqual(self.done.get_nowait(), 'DONE')
1405     # release the lock on the actual elements so remove() can proceed too
1406     self.ls.release()
1407     self._waitThreads()
1408     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1409     # reset lockset
1410     self._setUpLS()
1411
1412   @_Repeat
1413   def testConcurrentExclusiveSetLock(self):
1414     # acquire the set-lock...
1415     self.ls.acquire(None, shared=0)
1416     # ...no one can do anything else
1417     self._addThread(target=self._doLockSet, args=(None, 1))
1418     self._addThread(target=self._doLockSet, args=(None, 0))
1419     self._addThread(target=self._doLockSet, args=(['three'], 0))
1420     self._addThread(target=self._doLockSet, args=(['two'], 1))
1421     self._addThread(target=self._doAddSet, args=(['nine']))
1422     self.assertRaises(Queue.Empty, self.done.get_nowait)
1423     self.ls.release()
1424     self._waitThreads()
1425     for _ in range(5):
1426       self.assertEqual(self.done.get(True, 1), 'DONE')
1427     # cleanup
1428     self._setUpLS()
1429
1430   @_Repeat
1431   def testConcurrentSetLockAdd(self):
1432     self.ls.acquire('one')
1433     # Another thread wants the whole SetLock
1434     self._addThread(target=self._doLockSet, args=(None, 0))
1435     self._addThread(target=self._doLockSet, args=(None, 1))
1436     self.assertRaises(Queue.Empty, self.done.get_nowait)
1437     self.assertRaises(AssertionError, self.ls.add, 'four')
1438     self.ls.release()
1439     self._waitThreads()
1440     self.assertEqual(self.done.get_nowait(), 'DONE')
1441     self.assertEqual(self.done.get_nowait(), 'DONE')
1442     self.ls.acquire(None)
1443     self._addThread(target=self._doLockSet, args=(None, 0))
1444     self._addThread(target=self._doLockSet, args=(None, 1))
1445     self.assertRaises(Queue.Empty, self.done.get_nowait)
1446     self.ls.add('four')
1447     self.ls.add('five', acquired=1)
1448     self.ls.add('six', acquired=1, shared=1)
1449     self.assertEquals(self.ls._list_owned(),
1450       set(['one', 'two', 'three', 'five', 'six']))
1451     self.assertEquals(self.ls._is_owned(), True)
1452     self.assertEquals(self.ls._names(),
1453       set(['one', 'two', 'three', 'four', 'five', 'six']))
1454     self.ls.release()
1455     self._waitThreads()
1456     self.assertEqual(self.done.get_nowait(), 'DONE')
1457     self.assertEqual(self.done.get_nowait(), 'DONE')
1458     self._setUpLS()
1459
1460   @_Repeat
1461   def testEmptyLockSet(self):
1462     # get the set-lock
1463     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1464     # now empty it...
1465     self.ls.remove(['one', 'two', 'three'])
1466     # and adds/locks by another thread still wait
1467     self._addThread(target=self._doAddSet, args=(['nine']))
1468     self._addThread(target=self._doLockSet, args=(None, 1))
1469     self._addThread(target=self._doLockSet, args=(None, 0))
1470     self.assertRaises(Queue.Empty, self.done.get_nowait)
1471     self.ls.release()
1472     self._waitThreads()
1473     for _ in range(3):
1474       self.assertEqual(self.done.get_nowait(), 'DONE')
1475     # empty it again...
1476     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1477     # now share it...
1478     self.assertEqual(self.ls.acquire(None, shared=1), set())
1479     # other sharers can go, adds still wait
1480     self._addThread(target=self._doLockSet, args=(None, 1))
1481     self._waitThreads()
1482     self.assertEqual(self.done.get_nowait(), 'DONE')
1483     self._addThread(target=self._doAddSet, args=(['nine']))
1484     self.assertRaises(Queue.Empty, self.done.get_nowait)
1485     self.ls.release()
1486     self._waitThreads()
1487     self.assertEqual(self.done.get_nowait(), 'DONE')
1488     self._setUpLS()
1489
1490   def testAcquireWithNamesDowngrade(self):
1491     self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1492     self.assertTrue(self.ls._is_owned())
1493     self.assertFalse(self.ls._get_lock()._is_owned())
1494     self.ls.release()
1495     self.assertFalse(self.ls._is_owned())
1496     self.assertFalse(self.ls._get_lock()._is_owned())
1497     # Can't downgrade after releasing
1498     self.assertRaises(AssertionError, self.ls.downgrade, "two")
1499
1500   def testDowngrade(self):
1501     # Not owning anything, must raise an exception
1502     self.assertFalse(self.ls._is_owned())
1503     self.assertRaises(AssertionError, self.ls.downgrade)
1504
1505     self.assertFalse(compat.any(i._is_owned()
1506                                 for i in self.ls._get_lockdict().values()))
1507
1508     self.assertEquals(self.ls.acquire(None, shared=0),
1509                       set(["one", "two", "three"]))
1510     self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1511
1512     self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1513     self.assertTrue(compat.all(i._is_owned(shared=0)
1514                                for i in self.ls._get_lockdict().values()))
1515
1516     # Start downgrading locks
1517     self.assertTrue(self.ls.downgrade(names=["one"]))
1518     self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1519     self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")])
1520                                for name, lock in
1521                                  self.ls._get_lockdict().items()))
1522
1523     self.assertTrue(self.ls.downgrade(names="two"))
1524     self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1525     should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1526     self.assertTrue(compat.all(lock._is_owned(shared=should_share(name))
1527                                for name, lock in
1528                                  self.ls._get_lockdict().items()))
1529
1530     # Downgrading the last exclusive lock to shared must downgrade the
1531     # lockset-internal lock too
1532     self.assertTrue(self.ls.downgrade(names="three"))
1533     self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1534     self.assertTrue(compat.all(i._is_owned(shared=1)
1535                                for i in self.ls._get_lockdict().values()))
1536
1537     # Downgrading a shared lock must be a no-op
1538     self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1539     self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1540     self.assertTrue(compat.all(i._is_owned(shared=1)
1541                                for i in self.ls._get_lockdict().values()))
1542
1543     self.ls.release()
1544
1545   def testPriority(self):
1546     def _Acquire(prev, next, name, priority, success_fn):
1547       prev.wait()
1548       self.assert_(self.ls.acquire(name, shared=0,
1549                                    priority=priority,
1550                                    test_notify=lambda _: next.set()))
1551       try:
1552         success_fn()
1553       finally:
1554         self.ls.release()
1555
1556     # Get all in exclusive mode
1557     self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1558
1559     done_two = Queue.Queue(0)
1560
1561     first = threading.Event()
1562     prev = first
1563
1564     acquires = [("one", prio, self.done) for prio in range(1, 33)]
1565     acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1566
1567     # Use a deterministic random generator
1568     random.Random(741).shuffle(acquires)
1569
1570     for (name, prio, done) in acquires:
1571       ev = threading.Event()
1572       self._addThread(target=_Acquire,
1573                       args=(prev, ev, name, prio,
1574                             compat.partial(done.put, "Prio%s" % prio)))
1575       prev = ev
1576
1577     # Start acquires
1578     first.set()
1579
1580     # Wait for last acquire to start
1581     prev.wait()
1582
1583     # Let threads acquire locks
1584     self.ls.release()
1585
1586     # Wait for threads to finish
1587     self._waitThreads()
1588
1589     for i in range(1, 33):
1590       self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1591       self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1592
1593     self.assertRaises(Queue.Empty, self.done.get_nowait)
1594     self.assertRaises(Queue.Empty, done_two.get_nowait)
1595
1596
1597 class TestGanetiLockManager(_ThreadedTestCase):
1598
1599   def setUp(self):
1600     _ThreadedTestCase.setUp(self)
1601     self.nodes=['n1', 'n2']
1602     self.nodegroups=['g1', 'g2']
1603     self.instances=['i1', 'i2', 'i3']
1604     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1605                                         self.instances)
1606
1607   def tearDown(self):
1608     # Don't try this at home...
1609     locking.GanetiLockManager._instance = None
1610
1611   def testLockingConstants(self):
1612     # The locking library internally cheats by assuming its constants have some
1613     # relationships with each other. Check those hold true.
1614     # This relationship is also used in the Processor to recursively acquire
1615     # the right locks. Again, please don't break it.
1616     for i in range(len(locking.LEVELS)):
1617       self.assertEqual(i, locking.LEVELS[i])
1618
1619   def testDoubleGLFails(self):
1620     self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1621
1622   def testLockNames(self):
1623     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1624     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1625     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1626                      set(self.nodegroups))
1627     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1628                      set(self.instances))
1629
1630   def testInitAndResources(self):
1631     locking.GanetiLockManager._instance = None
1632     self.GL = locking.GanetiLockManager([], [], [])
1633     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1634     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1635     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1636     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1637
1638     locking.GanetiLockManager._instance = None
1639     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1640     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1641     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1642     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1643                                     set(self.nodegroups))
1644     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1645
1646     locking.GanetiLockManager._instance = None
1647     self.GL = locking.GanetiLockManager([], [], self.instances)
1648     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1649     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1650     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1651     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1652                      set(self.instances))
1653
1654   def testAcquireRelease(self):
1655     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1656     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1657     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1658     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1659     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1660     self.GL.release(locking.LEVEL_NODE, ['n2'])
1661     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1662     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1663     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1664     self.GL.release(locking.LEVEL_NODE)
1665     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1666     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1667     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1668     self.GL.release(locking.LEVEL_NODEGROUP)
1669     self.GL.release(locking.LEVEL_INSTANCE)
1670     self.assertRaises(errors.LockError, self.GL.acquire,
1671                       locking.LEVEL_INSTANCE, ['i5'])
1672     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1673     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1674
1675   def testAcquireWholeSets(self):
1676     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1677     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1678                       set(self.instances))
1679     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1680                       set(self.instances))
1681     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1682                       set(self.nodegroups))
1683     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1684                       set(self.nodegroups))
1685     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1686                       set(self.nodes))
1687     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1688                       set(self.nodes))
1689     self.GL.release(locking.LEVEL_NODE)
1690     self.GL.release(locking.LEVEL_NODEGROUP)
1691     self.GL.release(locking.LEVEL_INSTANCE)
1692     self.GL.release(locking.LEVEL_CLUSTER)
1693
1694   def testAcquireWholeAndPartial(self):
1695     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1696     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1697                       set(self.instances))
1698     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1699                       set(self.instances))
1700     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1701                       set(['n2']))
1702     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1703                       set(['n2']))
1704     self.GL.release(locking.LEVEL_NODE)
1705     self.GL.release(locking.LEVEL_INSTANCE)
1706     self.GL.release(locking.LEVEL_CLUSTER)
1707
1708   def testBGLDependency(self):
1709     self.assertRaises(AssertionError, self.GL.acquire,
1710                       locking.LEVEL_NODE, ['n1', 'n2'])
1711     self.assertRaises(AssertionError, self.GL.acquire,
1712                       locking.LEVEL_INSTANCE, ['i3'])
1713     self.assertRaises(AssertionError, self.GL.acquire,
1714                       locking.LEVEL_NODEGROUP, ['g1'])
1715     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1716     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1717     self.assertRaises(AssertionError, self.GL.release,
1718                       locking.LEVEL_CLUSTER, ['BGL'])
1719     self.assertRaises(AssertionError, self.GL.release,
1720                       locking.LEVEL_CLUSTER)
1721     self.GL.release(locking.LEVEL_NODE)
1722     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1723     self.assertRaises(AssertionError, self.GL.release,
1724                       locking.LEVEL_CLUSTER, ['BGL'])
1725     self.assertRaises(AssertionError, self.GL.release,
1726                       locking.LEVEL_CLUSTER)
1727     self.GL.release(locking.LEVEL_INSTANCE)
1728     self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1729     self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1730     self.assertRaises(AssertionError, self.GL.release,
1731                       locking.LEVEL_CLUSTER, ['BGL'])
1732     self.assertRaises(AssertionError, self.GL.release,
1733                       locking.LEVEL_CLUSTER)
1734     self.GL.release(locking.LEVEL_NODEGROUP)
1735     self.GL.release(locking.LEVEL_CLUSTER)
1736
1737   def testWrongOrder(self):
1738     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1739     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1740     self.assertRaises(AssertionError, self.GL.acquire,
1741                       locking.LEVEL_NODE, ['n1'])
1742     self.assertRaises(AssertionError, self.GL.acquire,
1743                       locking.LEVEL_NODEGROUP, ['g1'])
1744     self.assertRaises(AssertionError, self.GL.acquire,
1745                       locking.LEVEL_INSTANCE, ['i2'])
1746
1747   def testModifiableLevels(self):
1748     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1749                       ['BGL2'])
1750     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1751     self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1752     self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1753     self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1754     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1755     self.GL.add(locking.LEVEL_NODE, ['n3'])
1756     self.GL.remove(locking.LEVEL_NODE, ['n1'])
1757     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1758     self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1759     self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1760     self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1761     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1762     self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1763                       ['BGL2'])
1764
1765   # Helper function to run as a thread that shared the BGL and then acquires
1766   # some locks at another level.
1767   def _doLock(self, level, names, shared):
1768     try:
1769       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1770       self.GL.acquire(level, names, shared=shared)
1771       self.done.put('DONE')
1772       self.GL.release(level)
1773       self.GL.release(locking.LEVEL_CLUSTER)
1774     except errors.LockError:
1775       self.done.put('ERR')
1776
1777   @_Repeat
1778   def testConcurrency(self):
1779     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1780     self._addThread(target=self._doLock,
1781                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1782     self._waitThreads()
1783     self.assertEqual(self.done.get_nowait(), 'DONE')
1784     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1785     self._addThread(target=self._doLock,
1786                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1787     self._waitThreads()
1788     self.assertEqual(self.done.get_nowait(), 'DONE')
1789     self._addThread(target=self._doLock,
1790                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1791     self.assertRaises(Queue.Empty, self.done.get_nowait)
1792     self.GL.release(locking.LEVEL_INSTANCE)
1793     self._waitThreads()
1794     self.assertEqual(self.done.get_nowait(), 'DONE')
1795     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1796     self._addThread(target=self._doLock,
1797                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1798     self._waitThreads()
1799     self.assertEqual(self.done.get_nowait(), 'DONE')
1800     self._addThread(target=self._doLock,
1801                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1802     self.assertRaises(Queue.Empty, self.done.get_nowait)
1803     self.GL.release(locking.LEVEL_INSTANCE)
1804     self._waitThreads()
1805     self.assertEqual(self.done.get(True, 1), 'DONE')
1806     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1807
1808
1809 class TestLockMonitor(_ThreadedTestCase):
1810   def setUp(self):
1811     _ThreadedTestCase.setUp(self)
1812     self.lm = locking.LockMonitor()
1813
1814   def testSingleThread(self):
1815     locks = []
1816
1817     for i in range(100):
1818       name = "TestLock%s" % i
1819       locks.append(locking.SharedLock(name, monitor=self.lm))
1820
1821     self.assertEqual(len(self.lm._locks), len(locks))
1822     result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1823     self.assertEqual(len(result.fields), 1)
1824     self.assertEqual(len(result.data), 100)
1825
1826     # Delete all locks
1827     del locks[:]
1828
1829     # The garbage collector might needs some time
1830     def _CheckLocks():
1831       if self.lm._locks:
1832         raise utils.RetryAgain()
1833
1834     utils.Retry(_CheckLocks, 0.1, 30.0)
1835
1836     self.assertFalse(self.lm._locks)
1837
1838   def testMultiThread(self):
1839     locks = []
1840
1841     def _CreateLock(prev, next, name):
1842       prev.wait()
1843       locks.append(locking.SharedLock(name, monitor=self.lm))
1844       if next:
1845         next.set()
1846
1847     expnames = []
1848
1849     first = threading.Event()
1850     prev = first
1851
1852     # Use a deterministic random generator
1853     for i in random.Random(4263).sample(range(100), 33):
1854       name = "MtTestLock%s" % i
1855       expnames.append(name)
1856
1857       ev = threading.Event()
1858       self._addThread(target=_CreateLock, args=(prev, ev, name))
1859       prev = ev
1860
1861     # Add locks
1862     first.set()
1863     self._waitThreads()
1864
1865     # Check order in which locks were added
1866     self.assertEqual([i.name for i in locks], expnames)
1867
1868     # Check query result
1869     result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1870     self.assert_(isinstance(result, dict))
1871     response = objects.QueryResponse.FromDict(result)
1872     self.assertEqual(response.data,
1873                      [[(constants.RS_NORMAL, name),
1874                        (constants.RS_NORMAL, None),
1875                        (constants.RS_NORMAL, None),
1876                        (constants.RS_NORMAL, [])]
1877                       for name in utils.NiceSort(expnames)])
1878     self.assertEqual(len(response.fields), 4)
1879     self.assertEqual(["name", "mode", "owner", "pending"],
1880                      [fdef.name for fdef in response.fields])
1881
1882     # Test exclusive acquire
1883     for tlock in locks[::4]:
1884       tlock.acquire(shared=0)
1885       try:
1886         def _GetExpResult(name):
1887           if tlock.name == name:
1888             return [(constants.RS_NORMAL, name),
1889                     (constants.RS_NORMAL, "exclusive"),
1890                     (constants.RS_NORMAL,
1891                      [threading.currentThread().getName()]),
1892                     (constants.RS_NORMAL, [])]
1893           return [(constants.RS_NORMAL, name),
1894                   (constants.RS_NORMAL, None),
1895                   (constants.RS_NORMAL, None),
1896                   (constants.RS_NORMAL, [])]
1897
1898         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1899         self.assertEqual(objects.QueryResponse.FromDict(result).data,
1900                          [_GetExpResult(name)
1901                           for name in utils.NiceSort(expnames)])
1902       finally:
1903         tlock.release()
1904
1905     # Test shared acquire
1906     def _Acquire(lock, shared, ev, notify):
1907       lock.acquire(shared=shared)
1908       try:
1909         notify.set()
1910         ev.wait()
1911       finally:
1912         lock.release()
1913
1914     for tlock1 in locks[::11]:
1915       for tlock2 in locks[::-15]:
1916         if tlock2 == tlock1:
1917           # Avoid deadlocks
1918           continue
1919
1920         for tlock3 in locks[::10]:
1921           if tlock3 in (tlock2, tlock1):
1922             # Avoid deadlocks
1923             continue
1924
1925           releaseev = threading.Event()
1926
1927           # Acquire locks
1928           acquireev = []
1929           tthreads1 = []
1930           for i in range(3):
1931             ev = threading.Event()
1932             tthreads1.append(self._addThread(target=_Acquire,
1933                                              args=(tlock1, 1, releaseev, ev)))
1934             acquireev.append(ev)
1935
1936           ev = threading.Event()
1937           tthread2 = self._addThread(target=_Acquire,
1938                                      args=(tlock2, 1, releaseev, ev))
1939           acquireev.append(ev)
1940
1941           ev = threading.Event()
1942           tthread3 = self._addThread(target=_Acquire,
1943                                      args=(tlock3, 0, releaseev, ev))
1944           acquireev.append(ev)
1945
1946           # Wait for all locks to be acquired
1947           for i in acquireev:
1948             i.wait()
1949
1950           # Check query result
1951           result = self.lm.QueryLocks(["name", "mode", "owner"])
1952           response = objects.QueryResponse.FromDict(result)
1953           for (name, mode, owner) in response.data:
1954             (name_status, name_value) = name
1955             (owner_status, owner_value) = owner
1956
1957             self.assertEqual(name_status, constants.RS_NORMAL)
1958             self.assertEqual(owner_status, constants.RS_NORMAL)
1959
1960             if name_value == tlock1.name:
1961               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1962               self.assertEqual(set(owner_value),
1963                                set(i.getName() for i in tthreads1))
1964               continue
1965
1966             if name_value == tlock2.name:
1967               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1968               self.assertEqual(owner_value, [tthread2.getName()])
1969               continue
1970
1971             if name_value == tlock3.name:
1972               self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1973               self.assertEqual(owner_value, [tthread3.getName()])
1974               continue
1975
1976             self.assert_(name_value in expnames)
1977             self.assertEqual(mode, (constants.RS_NORMAL, None))
1978             self.assert_(owner_value is None)
1979
1980           # Release locks again
1981           releaseev.set()
1982
1983           self._waitThreads()
1984
1985           result = self.lm.QueryLocks(["name", "mode", "owner"])
1986           self.assertEqual(objects.QueryResponse.FromDict(result).data,
1987                            [[(constants.RS_NORMAL, name),
1988                              (constants.RS_NORMAL, None),
1989                              (constants.RS_NORMAL, None)]
1990                             for name in utils.NiceSort(expnames)])
1991
1992   def testDelete(self):
1993     lock = locking.SharedLock("TestLock", monitor=self.lm)
1994
1995     self.assertEqual(len(self.lm._locks), 1)
1996     result = self.lm.QueryLocks(["name", "mode", "owner"])
1997     self.assertEqual(objects.QueryResponse.FromDict(result).data,
1998                      [[(constants.RS_NORMAL, lock.name),
1999                        (constants.RS_NORMAL, None),
2000                        (constants.RS_NORMAL, None)]])
2001
2002     lock.delete()
2003
2004     result = self.lm.QueryLocks(["name", "mode", "owner"])
2005     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2006                      [[(constants.RS_NORMAL, lock.name),
2007                        (constants.RS_NORMAL, "deleted"),
2008                        (constants.RS_NORMAL, None)]])
2009     self.assertEqual(len(self.lm._locks), 1)
2010
2011   def testPending(self):
2012     def _Acquire(lock, shared, prev, next):
2013       prev.wait()
2014
2015       lock.acquire(shared=shared, test_notify=next.set)
2016       try:
2017         pass
2018       finally:
2019         lock.release()
2020
2021     lock = locking.SharedLock("ExcLock", monitor=self.lm)
2022
2023     for shared in [0, 1]:
2024       lock.acquire()
2025       try:
2026         self.assertEqual(len(self.lm._locks), 1)
2027         result = self.lm.QueryLocks(["name", "mode", "owner"])
2028         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2029                          [[(constants.RS_NORMAL, lock.name),
2030                            (constants.RS_NORMAL, "exclusive"),
2031                            (constants.RS_NORMAL,
2032                             [threading.currentThread().getName()])]])
2033
2034         threads = []
2035
2036         first = threading.Event()
2037         prev = first
2038
2039         for i in range(5):
2040           ev = threading.Event()
2041           threads.append(self._addThread(target=_Acquire,
2042                                           args=(lock, shared, prev, ev)))
2043           prev = ev
2044
2045         # Start acquires
2046         first.set()
2047
2048         # Wait for last acquire to start waiting
2049         prev.wait()
2050
2051         # NOTE: This works only because QueryLocks will acquire the
2052         # lock-internal lock again and won't be able to get the information
2053         # until it has the lock. By then the acquire should be registered in
2054         # SharedLock.__pending (otherwise it's a bug).
2055
2056         # All acquires are waiting now
2057         if shared:
2058           pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2059         else:
2060           pending = [("exclusive", [t.getName()]) for t in threads]
2061
2062         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2063         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2064                          [[(constants.RS_NORMAL, lock.name),
2065                            (constants.RS_NORMAL, "exclusive"),
2066                            (constants.RS_NORMAL,
2067                             [threading.currentThread().getName()]),
2068                            (constants.RS_NORMAL, pending)]])
2069
2070         self.assertEqual(len(self.lm._locks), 1)
2071       finally:
2072         lock.release()
2073
2074       self._waitThreads()
2075
2076       # No pending acquires
2077       result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2078       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2079                        [[(constants.RS_NORMAL, lock.name),
2080                          (constants.RS_NORMAL, None),
2081                          (constants.RS_NORMAL, None),
2082                          (constants.RS_NORMAL, [])]])
2083
2084       self.assertEqual(len(self.lm._locks), 1)
2085
2086   def testDeleteAndRecreate(self):
2087     lname = "TestLock101923193"
2088
2089     # Create some locks with the same name and keep all references
2090     locks = [locking.SharedLock(lname, monitor=self.lm)
2091              for _ in range(5)]
2092
2093     self.assertEqual(len(self.lm._locks), len(locks))
2094
2095     result = self.lm.QueryLocks(["name", "mode", "owner"])
2096     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2097                      [[(constants.RS_NORMAL, lname),
2098                        (constants.RS_NORMAL, None),
2099                        (constants.RS_NORMAL, None)]] * 5)
2100
2101     locks[2].delete()
2102
2103     # Check information order
2104     result = self.lm.QueryLocks(["name", "mode", "owner"])
2105     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2106                      [[(constants.RS_NORMAL, lname),
2107                        (constants.RS_NORMAL, None),
2108                        (constants.RS_NORMAL, None)]] * 2 +
2109                      [[(constants.RS_NORMAL, lname),
2110                        (constants.RS_NORMAL, "deleted"),
2111                        (constants.RS_NORMAL, None)]] +
2112                      [[(constants.RS_NORMAL, lname),
2113                        (constants.RS_NORMAL, None),
2114                        (constants.RS_NORMAL, None)]] * 2)
2115
2116     locks[1].acquire(shared=0)
2117
2118     last_status = [
2119       [(constants.RS_NORMAL, lname),
2120        (constants.RS_NORMAL, None),
2121        (constants.RS_NORMAL, None)],
2122       [(constants.RS_NORMAL, lname),
2123        (constants.RS_NORMAL, "exclusive"),
2124        (constants.RS_NORMAL, [threading.currentThread().getName()])],
2125       [(constants.RS_NORMAL, lname),
2126        (constants.RS_NORMAL, "deleted"),
2127        (constants.RS_NORMAL, None)],
2128       [(constants.RS_NORMAL, lname),
2129        (constants.RS_NORMAL, None),
2130        (constants.RS_NORMAL, None)],
2131       [(constants.RS_NORMAL, lname),
2132        (constants.RS_NORMAL, None),
2133        (constants.RS_NORMAL, None)],
2134       ]
2135
2136     # Check information order
2137     result = self.lm.QueryLocks(["name", "mode", "owner"])
2138     self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2139
2140     self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2141     self.assertEqual(len(self.lm._locks), len(locks))
2142
2143     # Check lock deletion
2144     for idx in range(len(locks)):
2145       del locks[0]
2146       assert gc.isenabled()
2147       gc.collect()
2148       self.assertEqual(len(self.lm._locks), len(locks))
2149       result = self.lm.QueryLocks(["name", "mode", "owner"])
2150       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2151                        last_status[idx + 1:])
2152
2153     # All locks should have been deleted
2154     assert not locks
2155     self.assertFalse(self.lm._locks)
2156
2157     result = self.lm.QueryLocks(["name", "mode", "owner"])
2158     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2159
2160   class _FakeLock:
2161     def __init__(self):
2162       self._info = []
2163
2164     def AddResult(self, *args):
2165       self._info.append(args)
2166
2167     def CountPending(self):
2168       return len(self._info)
2169
2170     def GetLockInfo(self, requested):
2171       (exp_requested, result) = self._info.pop(0)
2172
2173       if exp_requested != requested:
2174         raise Exception("Requested information (%s) does not match"
2175                         " expectations (%s)" % (requested, exp_requested))
2176
2177       return result
2178
2179   def testMultipleResults(self):
2180     fl1 = self._FakeLock()
2181     fl2 = self._FakeLock()
2182
2183     self.lm.RegisterLock(fl1)
2184     self.lm.RegisterLock(fl2)
2185
2186     # Empty information
2187     for i in [fl1, fl2]:
2188       i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2189     result = self.lm.QueryLocks(["name", "mode", "owner"])
2190     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2191     for i in [fl1, fl2]:
2192       self.assertEqual(i.CountPending(), 0)
2193
2194     # Check ordering
2195     for fn in [lambda x: x, reversed, sorted]:
2196       fl1.AddResult(set(), list(fn([
2197         ("aaa", None, None, None),
2198         ("bbb", None, None, None),
2199         ])))
2200       fl2.AddResult(set(), [])
2201       result = self.lm.QueryLocks(["name"])
2202       self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2203         [(constants.RS_NORMAL, "aaa")],
2204         [(constants.RS_NORMAL, "bbb")],
2205         ])
2206       for i in [fl1, fl2]:
2207         self.assertEqual(i.CountPending(), 0)
2208
2209       for fn2 in [lambda x: x, reversed, sorted]:
2210         fl1.AddResult(set([query.LQ_MODE]), list(fn([
2211           # Same name, but different information
2212           ("aaa", "mode0", None, None),
2213           ("aaa", "mode1", None, None),
2214           ("aaa", "mode2", None, None),
2215           ("aaa", "mode3", None, None),
2216           ])))
2217         fl2.AddResult(set([query.LQ_MODE]), [
2218           ("zzz", "end", None, None),
2219           ("000", "start", None, None),
2220           ] + list(fn2([
2221           ("aaa", "b200", None, None),
2222           ("aaa", "b300", None, None),
2223           ])))
2224         result = self.lm.QueryLocks(["name", "mode"])
2225         self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2226           [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2227           ] + list(fn([
2228           # Name is the same, so order must be equal to incoming order
2229           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2230           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2231           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2232           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2233           ])) + list(fn2([
2234           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2235           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2236           ])) + [
2237           [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2238           ])
2239         for i in [fl1, fl2]:
2240           self.assertEqual(i.CountPending(), 0)
2241
2242
2243 if __name__ == '__main__':
2244   testutils.GanetiTestProgram()