Update default instance kernel version
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31 import gc
32 import itertools
33
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
41
42 import testutils
43
44
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
48
49 #: List for looping tests
50 ITERATIONS = range(8)
51
52
53 def _Repeat(fn):
54   """Decorator for executing a function many times"""
55   def wrapper(*args, **kwargs):
56     for i in ITERATIONS:
57       fn(*args, **kwargs)
58   return wrapper
59
60
61 def SafeSleep(duration):
62   start = time.time()
63   while True:
64     delay = start + duration - time.time()
65     if delay <= 0.0:
66       break
67     time.sleep(delay)
68
69
70 class _ThreadedTestCase(unittest.TestCase):
71   """Test class that supports adding/waiting on threads"""
72   def setUp(self):
73     unittest.TestCase.setUp(self)
74     self.done = Queue.Queue(0)
75     self.threads = []
76
77   def _addThread(self, *args, **kwargs):
78     """Create and remember a new thread"""
79     t = threading.Thread(*args, **kwargs)
80     self.threads.append(t)
81     t.start()
82     return t
83
84   def _waitThreads(self):
85     """Wait for all our threads to finish"""
86     for t in self.threads:
87       t.join(60)
88       self.failIf(t.isAlive())
89     self.threads = []
90
91
92 class _ConditionTestCase(_ThreadedTestCase):
93   """Common test case for conditions"""
94
95   def setUp(self, cls):
96     _ThreadedTestCase.setUp(self)
97     self.lock = threading.Lock()
98     self.cond = cls(self.lock)
99
100   def _testAcquireRelease(self):
101     self.assertFalse(self.cond._is_owned())
102     self.assertRaises(RuntimeError, self.cond.wait, None)
103     self.assertRaises(RuntimeError, self.cond.notifyAll)
104
105     self.cond.acquire()
106     self.assert_(self.cond._is_owned())
107     self.cond.notifyAll()
108     self.assert_(self.cond._is_owned())
109     self.cond.release()
110
111     self.assertFalse(self.cond._is_owned())
112     self.assertRaises(RuntimeError, self.cond.wait, None)
113     self.assertRaises(RuntimeError, self.cond.notifyAll)
114
115   def _testNotification(self):
116     def _NotifyAll():
117       self.done.put("NE")
118       self.cond.acquire()
119       self.done.put("NA")
120       self.cond.notifyAll()
121       self.done.put("NN")
122       self.cond.release()
123
124     self.cond.acquire()
125     self._addThread(target=_NotifyAll)
126     self.assertEqual(self.done.get(True, 1), "NE")
127     self.assertRaises(Queue.Empty, self.done.get_nowait)
128     self.cond.wait(None)
129     self.assertEqual(self.done.get(True, 1), "NA")
130     self.assertEqual(self.done.get(True, 1), "NN")
131     self.assert_(self.cond._is_owned())
132     self.cond.release()
133     self.assertFalse(self.cond._is_owned())
134
135
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137   """SingleNotifyPipeCondition tests"""
138
139   def setUp(self):
140     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141
142   def testAcquireRelease(self):
143     self._testAcquireRelease()
144
145   def testNotification(self):
146     self._testNotification()
147
148   def testWaitReuse(self):
149     self.cond.acquire()
150     self.cond.wait(0)
151     self.cond.wait(0.1)
152     self.cond.release()
153
154   def testNoNotifyReuse(self):
155     self.cond.acquire()
156     self.cond.notifyAll()
157     self.assertRaises(RuntimeError, self.cond.wait, None)
158     self.assertRaises(RuntimeError, self.cond.notifyAll)
159     self.cond.release()
160
161
162 class TestPipeCondition(_ConditionTestCase):
163   """PipeCondition tests"""
164
165   def setUp(self):
166     _ConditionTestCase.setUp(self, locking.PipeCondition)
167
168   def testAcquireRelease(self):
169     self._testAcquireRelease()
170
171   def testNotification(self):
172     self._testNotification()
173
174   def _TestWait(self, fn):
175     threads = [
176       self._addThread(target=fn),
177       self._addThread(target=fn),
178       self._addThread(target=fn),
179       ]
180
181     # Wait for threads to be waiting
182     for _ in threads:
183       self.assertEqual(self.done.get(True, 1), "A")
184
185     self.assertRaises(Queue.Empty, self.done.get_nowait)
186
187     self.cond.acquire()
188     self.assertEqual(len(self.cond._waiters), 3)
189     self.assertEqual(self.cond._waiters, set(threads))
190     # This new thread can't acquire the lock, and thus call wait, before we
191     # release it
192     self._addThread(target=fn)
193     self.cond.notifyAll()
194     self.assertRaises(Queue.Empty, self.done.get_nowait)
195     self.cond.release()
196
197     # We should now get 3 W and 1 A (for the new thread) in whatever order
198     w = 0
199     a = 0
200     for i in range(4):
201       got = self.done.get(True, 1)
202       if got == "W":
203         w += 1
204       elif got == "A":
205         a += 1
206       else:
207         self.fail("Got %s on the done queue" % got)
208
209     self.assertEqual(w, 3)
210     self.assertEqual(a, 1)
211
212     self.cond.acquire()
213     self.cond.notifyAll()
214     self.cond.release()
215     self._waitThreads()
216     self.assertEqual(self.done.get_nowait(), "W")
217     self.assertRaises(Queue.Empty, self.done.get_nowait)
218
219   def testBlockingWait(self):
220     def _BlockingWait():
221       self.cond.acquire()
222       self.done.put("A")
223       self.cond.wait(None)
224       self.cond.release()
225       self.done.put("W")
226
227     self._TestWait(_BlockingWait)
228
229   def testLongTimeoutWait(self):
230     def _Helper():
231       self.cond.acquire()
232       self.done.put("A")
233       self.cond.wait(15.0)
234       self.cond.release()
235       self.done.put("W")
236
237     self._TestWait(_Helper)
238
239   def _TimeoutWait(self, timeout, check):
240     self.cond.acquire()
241     self.cond.wait(timeout)
242     self.cond.release()
243     self.done.put(check)
244
245   def testShortTimeoutWait(self):
246     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248     self._waitThreads()
249     self.assertEqual(self.done.get_nowait(), "T1")
250     self.assertEqual(self.done.get_nowait(), "T1")
251     self.assertRaises(Queue.Empty, self.done.get_nowait)
252
253   def testZeroTimeoutWait(self):
254     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257     self._waitThreads()
258     self.assertEqual(self.done.get_nowait(), "T0")
259     self.assertEqual(self.done.get_nowait(), "T0")
260     self.assertEqual(self.done.get_nowait(), "T0")
261     self.assertRaises(Queue.Empty, self.done.get_nowait)
262
263
264 class TestSharedLock(_ThreadedTestCase):
265   """SharedLock tests"""
266
267   def setUp(self):
268     _ThreadedTestCase.setUp(self)
269     self.sl = locking.SharedLock("TestSharedLock")
270
271   def testSequenceAndOwnership(self):
272     self.assertFalse(self.sl.is_owned())
273     self.sl.acquire(shared=1)
274     self.assert_(self.sl.is_owned())
275     self.assert_(self.sl.is_owned(shared=1))
276     self.assertFalse(self.sl.is_owned(shared=0))
277     self.sl.release()
278     self.assertFalse(self.sl.is_owned())
279     self.sl.acquire()
280     self.assert_(self.sl.is_owned())
281     self.assertFalse(self.sl.is_owned(shared=1))
282     self.assert_(self.sl.is_owned(shared=0))
283     self.sl.release()
284     self.assertFalse(self.sl.is_owned())
285     self.sl.acquire(shared=1)
286     self.assert_(self.sl.is_owned())
287     self.assert_(self.sl.is_owned(shared=1))
288     self.assertFalse(self.sl.is_owned(shared=0))
289     self.sl.release()
290     self.assertFalse(self.sl.is_owned())
291
292   def testBooleanValue(self):
293     # semaphores are supposed to return a true value on a successful acquire
294     self.assert_(self.sl.acquire(shared=1))
295     self.sl.release()
296     self.assert_(self.sl.acquire())
297     self.sl.release()
298
299   def testDoubleLockingStoE(self):
300     self.sl.acquire(shared=1)
301     self.assertRaises(AssertionError, self.sl.acquire)
302
303   def testDoubleLockingEtoS(self):
304     self.sl.acquire()
305     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306
307   def testDoubleLockingStoS(self):
308     self.sl.acquire(shared=1)
309     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310
311   def testDoubleLockingEtoE(self):
312     self.sl.acquire()
313     self.assertRaises(AssertionError, self.sl.acquire)
314
315   # helper functions: called in a separate thread they acquire the lock, send
316   # their identifier on the done queue, then release it.
317   def _doItSharer(self):
318     try:
319       self.sl.acquire(shared=1)
320       self.done.put('SHR')
321       self.sl.release()
322     except errors.LockError:
323       self.done.put('ERR')
324
325   def _doItExclusive(self):
326     try:
327       self.sl.acquire()
328       self.done.put('EXC')
329       self.sl.release()
330     except errors.LockError:
331       self.done.put('ERR')
332
333   def _doItDelete(self):
334     try:
335       self.sl.delete()
336       self.done.put('DEL')
337     except errors.LockError:
338       self.done.put('ERR')
339
340   def testSharersCanCoexist(self):
341     self.sl.acquire(shared=1)
342     threading.Thread(target=self._doItSharer).start()
343     self.assert_(self.done.get(True, 1))
344     self.sl.release()
345
346   @_Repeat
347   def testExclusiveBlocksExclusive(self):
348     self.sl.acquire()
349     self._addThread(target=self._doItExclusive)
350     self.assertRaises(Queue.Empty, self.done.get_nowait)
351     self.sl.release()
352     self._waitThreads()
353     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354
355   @_Repeat
356   def testExclusiveBlocksDelete(self):
357     self.sl.acquire()
358     self._addThread(target=self._doItDelete)
359     self.assertRaises(Queue.Empty, self.done.get_nowait)
360     self.sl.release()
361     self._waitThreads()
362     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363     self.sl = locking.SharedLock(self.sl.name)
364
365   @_Repeat
366   def testExclusiveBlocksSharer(self):
367     self.sl.acquire()
368     self._addThread(target=self._doItSharer)
369     self.assertRaises(Queue.Empty, self.done.get_nowait)
370     self.sl.release()
371     self._waitThreads()
372     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373
374   @_Repeat
375   def testSharerBlocksExclusive(self):
376     self.sl.acquire(shared=1)
377     self._addThread(target=self._doItExclusive)
378     self.assertRaises(Queue.Empty, self.done.get_nowait)
379     self.sl.release()
380     self._waitThreads()
381     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382
383   @_Repeat
384   def testSharerBlocksDelete(self):
385     self.sl.acquire(shared=1)
386     self._addThread(target=self._doItDelete)
387     self.assertRaises(Queue.Empty, self.done.get_nowait)
388     self.sl.release()
389     self._waitThreads()
390     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391     self.sl = locking.SharedLock(self.sl.name)
392
393   @_Repeat
394   def testWaitingExclusiveBlocksSharer(self):
395     """SKIPPED testWaitingExclusiveBlockSharer"""
396     return
397
398     self.sl.acquire(shared=1)
399     # the lock is acquired in shared mode...
400     self._addThread(target=self._doItExclusive)
401     # ...but now an exclusive is waiting...
402     self._addThread(target=self._doItSharer)
403     # ...so the sharer should be blocked as well
404     self.assertRaises(Queue.Empty, self.done.get_nowait)
405     self.sl.release()
406     self._waitThreads()
407     # The exclusive passed before
408     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410
411   @_Repeat
412   def testWaitingSharerBlocksExclusive(self):
413     """SKIPPED testWaitingSharerBlocksExclusive"""
414     return
415
416     self.sl.acquire()
417     # the lock is acquired in exclusive mode...
418     self._addThread(target=self._doItSharer)
419     # ...but now a sharer is waiting...
420     self._addThread(target=self._doItExclusive)
421     # ...the exclusive is waiting too...
422     self.assertRaises(Queue.Empty, self.done.get_nowait)
423     self.sl.release()
424     self._waitThreads()
425     # The sharer passed before
426     self.assertEqual(self.done.get_nowait(), 'SHR')
427     self.assertEqual(self.done.get_nowait(), 'EXC')
428
429   def testDelete(self):
430     self.sl.delete()
431     self.assertRaises(errors.LockError, self.sl.acquire)
432     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433     self.assertRaises(errors.LockError, self.sl.delete)
434
435   def testDeleteTimeout(self):
436     self.sl.delete(timeout=60)
437
438   def testNoDeleteIfSharer(self):
439     self.sl.acquire(shared=1)
440     self.assertRaises(AssertionError, self.sl.delete)
441
442   @_Repeat
443   def testDeletePendingSharersExclusiveDelete(self):
444     self.sl.acquire()
445     self._addThread(target=self._doItSharer)
446     self._addThread(target=self._doItSharer)
447     self._addThread(target=self._doItExclusive)
448     self._addThread(target=self._doItDelete)
449     self.sl.delete()
450     self._waitThreads()
451     # The threads who were pending return ERR
452     for _ in range(4):
453       self.assertEqual(self.done.get_nowait(), 'ERR')
454     self.sl = locking.SharedLock(self.sl.name)
455
456   @_Repeat
457   def testDeletePendingDeleteExclusiveSharers(self):
458     self.sl.acquire()
459     self._addThread(target=self._doItDelete)
460     self._addThread(target=self._doItExclusive)
461     self._addThread(target=self._doItSharer)
462     self._addThread(target=self._doItSharer)
463     self.sl.delete()
464     self._waitThreads()
465     # The two threads who were pending return both ERR
466     self.assertEqual(self.done.get_nowait(), 'ERR')
467     self.assertEqual(self.done.get_nowait(), 'ERR')
468     self.assertEqual(self.done.get_nowait(), 'ERR')
469     self.assertEqual(self.done.get_nowait(), 'ERR')
470     self.sl = locking.SharedLock(self.sl.name)
471
472   @_Repeat
473   def testExclusiveAcquireTimeout(self):
474     for shared in [0, 1]:
475       on_queue = threading.Event()
476       release_exclusive = threading.Event()
477
478       def _LockExclusive():
479         self.sl.acquire(shared=0, test_notify=on_queue.set)
480         self.done.put("A: start wait")
481         release_exclusive.wait()
482         self.done.put("A: end wait")
483         self.sl.release()
484
485       # Start thread to hold lock in exclusive mode
486       self._addThread(target=_LockExclusive)
487
488       # Wait for wait to begin
489       self.assertEqual(self.done.get(timeout=60), "A: start wait")
490
491       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
492       # on the queue
493       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494                                       test_notify=release_exclusive.set))
495
496       self.done.put("got 2nd")
497       self.sl.release()
498
499       self._waitThreads()
500
501       self.assertEqual(self.done.get_nowait(), "A: end wait")
502       self.assertEqual(self.done.get_nowait(), "got 2nd")
503       self.assertRaises(Queue.Empty, self.done.get_nowait)
504
505   @_Repeat
506   def testAcquireExpiringTimeout(self):
507     def _AcquireWithTimeout(shared, timeout):
508       if not self.sl.acquire(shared=shared, timeout=timeout):
509         self.done.put("timeout")
510
511     for shared in [0, 1]:
512       # Lock exclusively
513       self.sl.acquire()
514
515       # Start shared acquires with timeout between 0 and 20 ms
516       for i in range(11):
517         self._addThread(target=_AcquireWithTimeout,
518                         args=(shared, i * 2.0 / 1000.0))
519
520       # Wait for threads to finish (makes sure the acquire timeout expires
521       # before releasing the lock)
522       self._waitThreads()
523
524       # Release lock
525       self.sl.release()
526
527       for _ in range(11):
528         self.assertEqual(self.done.get_nowait(), "timeout")
529
530       self.assertRaises(Queue.Empty, self.done.get_nowait)
531
532   @_Repeat
533   def testSharedSkipExclusiveAcquires(self):
534     # Tests whether shared acquires jump in front of exclusive acquires in the
535     # queue.
536
537     def _Acquire(shared, name, notify_ev, wait_ev):
538       if notify_ev:
539         notify_fn = notify_ev.set
540       else:
541         notify_fn = None
542
543       if wait_ev:
544         wait_ev.wait()
545
546       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
547         return
548
549       self.done.put(name)
550       self.sl.release()
551
552     # Get exclusive lock while we fill the queue
553     self.sl.acquire()
554
555     shrcnt1 = 5
556     shrcnt2 = 7
557     shrcnt3 = 9
558     shrcnt4 = 2
559
560     # Add acquires using threading.Event for synchronization. They'll be
561     # acquired exactly in the order defined in this list.
562     acquires = (shrcnt1 * [(1, "shared 1")] +
563                 3 * [(0, "exclusive 1")] +
564                 shrcnt2 * [(1, "shared 2")] +
565                 shrcnt3 * [(1, "shared 3")] +
566                 shrcnt4 * [(1, "shared 4")] +
567                 3 * [(0, "exclusive 2")])
568
569     ev_cur = None
570     ev_prev = None
571
572     for args in acquires:
573       ev_cur = threading.Event()
574       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
575       ev_prev = ev_cur
576
577     # Wait for last acquire to start
578     ev_prev.wait()
579
580     # Expect 6 pending exclusive acquires and 1 for all shared acquires
581     # together
582     self.assertEqual(self.sl._count_pending(), 7)
583
584     # Release exclusive lock and wait
585     self.sl.release()
586
587     self._waitThreads()
588
589     # Check sequence
590     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591       # Shared locks aren't guaranteed to be notified in order, but they'll be
592       # first
593       tmp = self.done.get_nowait()
594       if tmp == "shared 1":
595         shrcnt1 -= 1
596       elif tmp == "shared 2":
597         shrcnt2 -= 1
598       elif tmp == "shared 3":
599         shrcnt3 -= 1
600       elif tmp == "shared 4":
601         shrcnt4 -= 1
602     self.assertEqual(shrcnt1, 0)
603     self.assertEqual(shrcnt2, 0)
604     self.assertEqual(shrcnt3, 0)
605     self.assertEqual(shrcnt3, 0)
606
607     for _ in range(3):
608       self.assertEqual(self.done.get_nowait(), "exclusive 1")
609
610     for _ in range(3):
611       self.assertEqual(self.done.get_nowait(), "exclusive 2")
612
613     self.assertRaises(Queue.Empty, self.done.get_nowait)
614
615   def testIllegalDowngrade(self):
616     # Not yet acquired
617     self.assertRaises(AssertionError, self.sl.downgrade)
618
619     # Acquire in shared mode, downgrade should be no-op
620     self.assertTrue(self.sl.acquire(shared=1))
621     self.assertTrue(self.sl.is_owned(shared=1))
622     self.assertTrue(self.sl.downgrade())
623     self.assertTrue(self.sl.is_owned(shared=1))
624     self.sl.release()
625
626   def testDowngrade(self):
627     self.assertTrue(self.sl.acquire())
628     self.assertTrue(self.sl.is_owned(shared=0))
629     self.assertTrue(self.sl.downgrade())
630     self.assertTrue(self.sl.is_owned(shared=1))
631     self.sl.release()
632
633   @_Repeat
634   def testDowngradeJumpsAheadOfExclusive(self):
635     def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636       self.assertTrue(self.sl.acquire())
637       self.assertTrue(self.sl.is_owned(shared=0))
638       ev_got.set()
639       ev_downgrade.wait()
640       self.assertTrue(self.sl.is_owned(shared=0))
641       self.assertTrue(self.sl.downgrade())
642       self.assertTrue(self.sl.is_owned(shared=1))
643       ev_release.wait()
644       self.assertTrue(self.sl.is_owned(shared=1))
645       self.sl.release()
646
647     def _KeepExclusive2(ev_started, ev_release):
648       self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649       self.assertTrue(self.sl.is_owned(shared=0))
650       ev_release.wait()
651       self.assertTrue(self.sl.is_owned(shared=0))
652       self.sl.release()
653
654     def _KeepShared(ev_started, ev_got, ev_release):
655       self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656       self.assertTrue(self.sl.is_owned(shared=1))
657       ev_got.set()
658       ev_release.wait()
659       self.assertTrue(self.sl.is_owned(shared=1))
660       self.sl.release()
661
662     # Acquire lock in exclusive mode
663     ev_got_excl1 = threading.Event()
664     ev_downgrade_excl1 = threading.Event()
665     ev_release_excl1 = threading.Event()
666     th_excl1 = self._addThread(target=_KeepExclusive,
667                                args=(ev_got_excl1, ev_downgrade_excl1,
668                                      ev_release_excl1))
669     ev_got_excl1.wait()
670
671     # Start a second exclusive acquire
672     ev_started_excl2 = threading.Event()
673     ev_release_excl2 = threading.Event()
674     th_excl2 = self._addThread(target=_KeepExclusive2,
675                                args=(ev_started_excl2, ev_release_excl2))
676     ev_started_excl2.wait()
677
678     # Start shared acquires, will jump ahead of second exclusive acquire when
679     # first exclusive acquire downgrades
680     ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681     ev_release_shared = threading.Event()
682
683     th_shared = [self._addThread(target=_KeepShared,
684                                  args=(ev_started, ev_got, ev_release_shared))
685                  for (ev_started, ev_got) in ev_shared]
686
687     # Wait for all shared acquires to start
688     for (ev, _) in ev_shared:
689       ev.wait()
690
691     # Check lock information
692     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693                      [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
694     [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
695     self.assertEqual([(pendmode, sorted(waiting))
696                       for (pendmode, waiting) in pending],
697                      [("exclusive", [th_excl2.getName()]),
698                       ("shared", sorted(th.getName() for th in th_shared))])
699
700     # Shared acquires won't start until the exclusive lock is downgraded
701     ev_downgrade_excl1.set()
702
703     # Wait for all shared acquires to be successful
704     for (_, ev) in ev_shared:
705       ev.wait()
706
707     # Check lock information again
708     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
709                                               query.LQ_PENDING])),
710                      [(self.sl.name, "shared", None,
711                        [("exclusive", [th_excl2.getName()])])])
712     [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
713     self.assertEqual(set(owner), set([th_excl1.getName()] +
714                                      [th.getName() for th in th_shared]))
715
716     ev_release_excl1.set()
717     ev_release_excl2.set()
718     ev_release_shared.set()
719
720     self._waitThreads()
721
722     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
723                                               query.LQ_PENDING])),
724                      [(self.sl.name, None, None, [])])
725
726   @_Repeat
727   def testMixedAcquireTimeout(self):
728     sync = threading.Event()
729
730     def _AcquireShared(ev):
731       if not self.sl.acquire(shared=1, timeout=None):
732         return
733
734       self.done.put("shared")
735
736       # Notify main thread
737       ev.set()
738
739       # Wait for notification from main thread
740       sync.wait()
741
742       # Release lock
743       self.sl.release()
744
745     acquires = []
746     for _ in range(3):
747       ev = threading.Event()
748       self._addThread(target=_AcquireShared, args=(ev, ))
749       acquires.append(ev)
750
751     # Wait for all acquires to finish
752     for i in acquires:
753       i.wait()
754
755     self.assertEqual(self.sl._count_pending(), 0)
756
757     # Try to get exclusive lock
758     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
759
760     # Acquire exclusive without timeout
761     exclsync = threading.Event()
762     exclev = threading.Event()
763
764     def _AcquireExclusive():
765       if not self.sl.acquire(shared=0):
766         return
767
768       self.done.put("exclusive")
769
770       # Notify main thread
771       exclev.set()
772
773       # Wait for notification from main thread
774       exclsync.wait()
775
776       self.sl.release()
777
778     self._addThread(target=_AcquireExclusive)
779
780     # Try to get exclusive lock
781     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
782
783     # Make all shared holders release their locks
784     sync.set()
785
786     # Wait for exclusive acquire to succeed
787     exclev.wait()
788
789     self.assertEqual(self.sl._count_pending(), 0)
790
791     # Try to get exclusive lock
792     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
793
794     def _AcquireSharedSimple():
795       if self.sl.acquire(shared=1, timeout=None):
796         self.done.put("shared2")
797         self.sl.release()
798
799     for _ in range(10):
800       self._addThread(target=_AcquireSharedSimple)
801
802     # Tell exclusive lock to release
803     exclsync.set()
804
805     # Wait for everything to finish
806     self._waitThreads()
807
808     self.assertEqual(self.sl._count_pending(), 0)
809
810     # Check sequence
811     for _ in range(3):
812       self.assertEqual(self.done.get_nowait(), "shared")
813
814     self.assertEqual(self.done.get_nowait(), "exclusive")
815
816     for _ in range(10):
817       self.assertEqual(self.done.get_nowait(), "shared2")
818
819     self.assertRaises(Queue.Empty, self.done.get_nowait)
820
821   def testPriority(self):
822     # Acquire in exclusive mode
823     self.assert_(self.sl.acquire(shared=0))
824
825     # Queue acquires
826     def _Acquire(prev, next, shared, priority, result):
827       prev.wait()
828       self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
829       try:
830         self.done.put(result)
831       finally:
832         self.sl.release()
833
834     counter = itertools.count(0)
835     priorities = range(-20, 30)
836     first = threading.Event()
837     prev = first
838
839     # Data structure:
840     # {
841     #   priority:
842     #     [(shared/exclusive, set(acquire names), set(pending threads)),
843     #      (shared/exclusive, ...),
844     #      ...,
845     #     ],
846     # }
847     perprio = {}
848
849     # References shared acquire per priority in L{perprio}. Data structure:
850     # {
851     #   priority: (shared=1, set(acquire names), set(pending threads)),
852     # }
853     prioshared = {}
854
855     for seed in [4979, 9523, 14902, 32440]:
856       # Use a deterministic random generator
857       rnd = random.Random(seed)
858       for priority in [rnd.choice(priorities) for _ in range(30)]:
859         modes = [0, 1]
860         rnd.shuffle(modes)
861         for shared in modes:
862           # Unique name
863           acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
864
865           ev = threading.Event()
866           thread = self._addThread(target=_Acquire,
867                                    args=(prev, ev, shared, priority, acqname))
868           prev = ev
869
870           # Record expected aqcuire, see above for structure
871           data = (shared, set([acqname]), set([thread]))
872           priolist = perprio.setdefault(priority, [])
873           if shared:
874             priosh = prioshared.get(priority, None)
875             if priosh:
876               # Shared acquires are merged
877               for i, j in zip(priosh[1:], data[1:]):
878                 i.update(j)
879               assert data[0] == priosh[0]
880             else:
881               prioshared[priority] = data
882               priolist.append(data)
883           else:
884             priolist.append(data)
885
886     # Start all acquires and wait for them
887     first.set()
888     prev.wait()
889
890     # Check lock information
891     self.assertEqual(self.sl.GetLockInfo(set()),
892                      [(self.sl.name, None, None, None)])
893     self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
894                      [(self.sl.name, "exclusive",
895                        [threading.currentThread().getName()], None)])
896
897     self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
898                             perprio)
899
900     # Let threads acquire the lock
901     self.sl.release()
902
903     # Wait for everything to finish
904     self._waitThreads()
905
906     self.assert_(self.sl._check_empty())
907
908     # Check acquires by priority
909     for acquires in [perprio[i] for i in sorted(perprio.keys())]:
910       for (_, names, _) in acquires:
911         # For shared acquires, the set will contain 1..n entries. For exclusive
912         # acquires only one.
913         while names:
914           names.remove(self.done.get_nowait())
915       self.assertFalse(compat.any(names for (_, names, _) in acquires))
916
917     self.assertRaises(Queue.Empty, self.done.get_nowait)
918
919   def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
920     self.assertEqual(name, self.sl.name)
921     self.assert_(mode is None)
922     self.assert_(owner is None)
923
924     self.assertEqual([(pendmode, sorted(waiting))
925                       for (pendmode, waiting) in pending],
926                      [(["exclusive", "shared"][int(bool(shared))],
927                        sorted(t.getName() for t in threads))
928                       for acquires in [perprio[i]
929                                        for i in sorted(perprio.keys())]
930                       for (shared, _, threads) in acquires])
931
932
933 class TestSharedLockInCondition(_ThreadedTestCase):
934   """SharedLock as a condition lock tests"""
935
936   def setUp(self):
937     _ThreadedTestCase.setUp(self)
938     self.sl = locking.SharedLock("TestSharedLockInCondition")
939     self.setCondition()
940
941   def setCondition(self):
942     self.cond = threading.Condition(self.sl)
943
944   def testKeepMode(self):
945     self.cond.acquire(shared=1)
946     self.assert_(self.sl.is_owned(shared=1))
947     self.cond.wait(0)
948     self.assert_(self.sl.is_owned(shared=1))
949     self.cond.release()
950     self.cond.acquire(shared=0)
951     self.assert_(self.sl.is_owned(shared=0))
952     self.cond.wait(0)
953     self.assert_(self.sl.is_owned(shared=0))
954     self.cond.release()
955
956
957 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
958   """SharedLock as a pipe condition lock tests"""
959
960   def setCondition(self):
961     self.cond = locking.PipeCondition(self.sl)
962
963
964 class TestSSynchronizedDecorator(_ThreadedTestCase):
965   """Shared Lock Synchronized decorator test"""
966
967   def setUp(self):
968     _ThreadedTestCase.setUp(self)
969
970   @locking.ssynchronized(_decoratorlock)
971   def _doItExclusive(self):
972     self.assert_(_decoratorlock.is_owned())
973     self.done.put('EXC')
974
975   @locking.ssynchronized(_decoratorlock, shared=1)
976   def _doItSharer(self):
977     self.assert_(_decoratorlock.is_owned(shared=1))
978     self.done.put('SHR')
979
980   def testDecoratedFunctions(self):
981     self._doItExclusive()
982     self.assertFalse(_decoratorlock.is_owned())
983     self._doItSharer()
984     self.assertFalse(_decoratorlock.is_owned())
985
986   def testSharersCanCoexist(self):
987     _decoratorlock.acquire(shared=1)
988     threading.Thread(target=self._doItSharer).start()
989     self.assert_(self.done.get(True, 1))
990     _decoratorlock.release()
991
992   @_Repeat
993   def testExclusiveBlocksExclusive(self):
994     _decoratorlock.acquire()
995     self._addThread(target=self._doItExclusive)
996     # give it a bit of time to check that it's not actually doing anything
997     self.assertRaises(Queue.Empty, self.done.get_nowait)
998     _decoratorlock.release()
999     self._waitThreads()
1000     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1001
1002   @_Repeat
1003   def testExclusiveBlocksSharer(self):
1004     _decoratorlock.acquire()
1005     self._addThread(target=self._doItSharer)
1006     self.assertRaises(Queue.Empty, self.done.get_nowait)
1007     _decoratorlock.release()
1008     self._waitThreads()
1009     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1010
1011   @_Repeat
1012   def testSharerBlocksExclusive(self):
1013     _decoratorlock.acquire(shared=1)
1014     self._addThread(target=self._doItExclusive)
1015     self.assertRaises(Queue.Empty, self.done.get_nowait)
1016     _decoratorlock.release()
1017     self._waitThreads()
1018     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1019
1020
1021 class TestLockSet(_ThreadedTestCase):
1022   """LockSet tests"""
1023
1024   def setUp(self):
1025     _ThreadedTestCase.setUp(self)
1026     self._setUpLS()
1027
1028   def _setUpLS(self):
1029     """Helper to (re)initialize the lock set"""
1030     self.resources = ['one', 'two', 'three']
1031     self.ls = locking.LockSet(self.resources, "TestLockSet")
1032
1033   def testResources(self):
1034     self.assertEquals(self.ls._names(), set(self.resources))
1035     newls = locking.LockSet([], "TestLockSet.testResources")
1036     self.assertEquals(newls._names(), set())
1037
1038   def testCheckOwnedUnknown(self):
1039     self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1040     for shared in [-1, 0, 1, 6378, 24255]:
1041       self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1042                                            shared=shared))
1043
1044   def testCheckOwnedUnknownWhileHolding(self):
1045     self.assertFalse(self.ls.check_owned([]))
1046     self.ls.acquire("one", shared=1)
1047     self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1048     self.assertTrue(self.ls.check_owned("one", shared=1))
1049     self.assertFalse(self.ls.check_owned("one", shared=0))
1050     self.assertFalse(self.ls.check_owned(["one", "two"]))
1051     self.assertRaises(errors.LockError, self.ls.check_owned,
1052                       ["one", "nonexist"])
1053     self.assertRaises(errors.LockError, self.ls.check_owned, "")
1054     self.ls.release()
1055     self.assertFalse(self.ls.check_owned([]))
1056     self.assertFalse(self.ls.check_owned("one"))
1057
1058   def testAcquireRelease(self):
1059     self.assertFalse(self.ls.check_owned(self.ls._names()))
1060     self.assert_(self.ls.acquire('one'))
1061     self.assertEquals(self.ls.list_owned(), set(['one']))
1062     self.assertTrue(self.ls.check_owned("one"))
1063     self.assertTrue(self.ls.check_owned("one", shared=0))
1064     self.assertFalse(self.ls.check_owned("one", shared=1))
1065     self.ls.release()
1066     self.assertEquals(self.ls.list_owned(), set())
1067     self.assertFalse(self.ls.check_owned(self.ls._names()))
1068     self.assertEquals(self.ls.acquire(['one']), set(['one']))
1069     self.assertEquals(self.ls.list_owned(), set(['one']))
1070     self.ls.release()
1071     self.assertEquals(self.ls.list_owned(), set())
1072     self.ls.acquire(['one', 'two', 'three'])
1073     self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1074     self.assertTrue(self.ls.check_owned(self.ls._names()))
1075     self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1076     self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1077     self.ls.release('one')
1078     self.assertFalse(self.ls.check_owned(["one"]))
1079     self.assertTrue(self.ls.check_owned(["two", "three"]))
1080     self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1081     self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1082     self.assertEquals(self.ls.list_owned(), set(['two', 'three']))
1083     self.ls.release(['three'])
1084     self.assertEquals(self.ls.list_owned(), set(['two']))
1085     self.ls.release()
1086     self.assertEquals(self.ls.list_owned(), set())
1087     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1088     self.assertEquals(self.ls.list_owned(), set(['one', 'three']))
1089     self.ls.release()
1090     self.assertEquals(self.ls.list_owned(), set())
1091     for name in self.ls._names():
1092       self.assertFalse(self.ls.check_owned(name))
1093
1094   def testNoDoubleAcquire(self):
1095     self.ls.acquire('one')
1096     self.assertRaises(AssertionError, self.ls.acquire, 'one')
1097     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1098     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1099     self.ls.release()
1100     self.ls.acquire(['one', 'three'])
1101     self.ls.release('one')
1102     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1103     self.ls.release('three')
1104
1105   def testNoWrongRelease(self):
1106     self.assertRaises(AssertionError, self.ls.release)
1107     self.ls.acquire('one')
1108     self.assertRaises(AssertionError, self.ls.release, 'two')
1109
1110   def testAddRemove(self):
1111     self.ls.add('four')
1112     self.assertEquals(self.ls.list_owned(), set())
1113     self.assert_('four' in self.ls._names())
1114     self.ls.add(['five', 'six', 'seven'], acquired=1)
1115     self.assert_('five' in self.ls._names())
1116     self.assert_('six' in self.ls._names())
1117     self.assert_('seven' in self.ls._names())
1118     self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven']))
1119     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1120     self.assert_('five' not in self.ls._names())
1121     self.assert_('six' not in self.ls._names())
1122     self.assertEquals(self.ls.list_owned(), set(['seven']))
1123     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1124     self.ls.remove('seven')
1125     self.assert_('seven' not in self.ls._names())
1126     self.assertEquals(self.ls.list_owned(), set([]))
1127     self.ls.acquire(None, shared=1)
1128     self.assertRaises(AssertionError, self.ls.add, 'eight')
1129     self.ls.release()
1130     self.ls.acquire(None)
1131     self.ls.add('eight', acquired=1)
1132     self.assert_('eight' in self.ls._names())
1133     self.assert_('eight' in self.ls.list_owned())
1134     self.ls.add('nine')
1135     self.assert_('nine' in self.ls._names())
1136     self.assert_('nine' not in self.ls.list_owned())
1137     self.ls.release()
1138     self.ls.remove(['two'])
1139     self.assert_('two' not in self.ls._names())
1140     self.ls.acquire('three')
1141     self.assertEquals(self.ls.remove(['three']), ['three'])
1142     self.assert_('three' not in self.ls._names())
1143     self.assertEquals(self.ls.remove('three'), [])
1144     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1145     self.assert_('one' not in self.ls._names())
1146
1147   def testRemoveNonBlocking(self):
1148     self.ls.acquire('one')
1149     self.assertEquals(self.ls.remove('one'), ['one'])
1150     self.ls.acquire(['two', 'three'])
1151     self.assertEquals(self.ls.remove(['two', 'three']),
1152                       ['two', 'three'])
1153
1154   def testNoDoubleAdd(self):
1155     self.assertRaises(errors.LockError, self.ls.add, 'two')
1156     self.ls.add('four')
1157     self.assertRaises(errors.LockError, self.ls.add, 'four')
1158
1159   def testNoWrongRemoves(self):
1160     self.ls.acquire(['one', 'three'], shared=1)
1161     # Cannot remove 'two' while holding something which is not a superset
1162     self.assertRaises(AssertionError, self.ls.remove, 'two')
1163     # Cannot remove 'three' as we are sharing it
1164     self.assertRaises(AssertionError, self.ls.remove, 'three')
1165
1166   def testAcquireSetLock(self):
1167     # acquire the set-lock exclusively
1168     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1169     self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1170     self.assertEquals(self.ls.is_owned(), True)
1171     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1172     # I can still add/remove elements...
1173     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1174     self.assert_(self.ls.add('six'))
1175     self.ls.release()
1176     # share the set-lock
1177     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1178     # adding new elements is not possible
1179     self.assertRaises(AssertionError, self.ls.add, 'five')
1180     self.ls.release()
1181
1182   def testAcquireWithRepetitions(self):
1183     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1184                       set(['two', 'two', 'three']))
1185     self.ls.release(['two', 'two'])
1186     self.assertEquals(self.ls.list_owned(), set(['three']))
1187
1188   def testEmptyAcquire(self):
1189     # Acquire an empty list of locks...
1190     self.assertEquals(self.ls.acquire([]), set())
1191     self.assertEquals(self.ls.list_owned(), set())
1192     # New locks can still be addded
1193     self.assert_(self.ls.add('six'))
1194     # "re-acquiring" is not an issue, since we had really acquired nothing
1195     self.assertEquals(self.ls.acquire([], shared=1), set())
1196     self.assertEquals(self.ls.list_owned(), set())
1197     # We haven't really acquired anything, so we cannot release
1198     self.assertRaises(AssertionError, self.ls.release)
1199
1200   def _doLockSet(self, names, shared):
1201     try:
1202       self.ls.acquire(names, shared=shared)
1203       self.done.put('DONE')
1204       self.ls.release()
1205     except errors.LockError:
1206       self.done.put('ERR')
1207
1208   def _doAddSet(self, names):
1209     try:
1210       self.ls.add(names, acquired=1)
1211       self.done.put('DONE')
1212       self.ls.release()
1213     except errors.LockError:
1214       self.done.put('ERR')
1215
1216   def _doRemoveSet(self, names):
1217     self.done.put(self.ls.remove(names))
1218
1219   @_Repeat
1220   def testConcurrentSharedAcquire(self):
1221     self.ls.acquire(['one', 'two'], shared=1)
1222     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1223     self._waitThreads()
1224     self.assertEqual(self.done.get_nowait(), 'DONE')
1225     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1226     self._waitThreads()
1227     self.assertEqual(self.done.get_nowait(), 'DONE')
1228     self._addThread(target=self._doLockSet, args=('three', 1))
1229     self._waitThreads()
1230     self.assertEqual(self.done.get_nowait(), 'DONE')
1231     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1232     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1233     self.assertRaises(Queue.Empty, self.done.get_nowait)
1234     self.ls.release()
1235     self._waitThreads()
1236     self.assertEqual(self.done.get_nowait(), 'DONE')
1237     self.assertEqual(self.done.get_nowait(), 'DONE')
1238
1239   @_Repeat
1240   def testConcurrentExclusiveAcquire(self):
1241     self.ls.acquire(['one', 'two'])
1242     self._addThread(target=self._doLockSet, args=('three', 1))
1243     self._waitThreads()
1244     self.assertEqual(self.done.get_nowait(), 'DONE')
1245     self._addThread(target=self._doLockSet, args=('three', 0))
1246     self._waitThreads()
1247     self.assertEqual(self.done.get_nowait(), 'DONE')
1248     self.assertRaises(Queue.Empty, self.done.get_nowait)
1249     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1250     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1251     self._addThread(target=self._doLockSet, args=('one', 0))
1252     self._addThread(target=self._doLockSet, args=('one', 1))
1253     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1254     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1255     self.assertRaises(Queue.Empty, self.done.get_nowait)
1256     self.ls.release()
1257     self._waitThreads()
1258     for _ in range(6):
1259       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1260
1261   @_Repeat
1262   def testSimpleAcquireTimeoutExpiring(self):
1263     names = sorted(self.ls._names())
1264     self.assert_(len(names) >= 3)
1265
1266     # Get name of first lock
1267     first = names[0]
1268
1269     # Get name of last lock
1270     last = names.pop()
1271
1272     checks = [
1273       # Block first and try to lock it again
1274       (first, first),
1275
1276       # Block last and try to lock all locks
1277       (None, first),
1278
1279       # Block last and try to lock it again
1280       (last, last),
1281       ]
1282
1283     for (wanted, block) in checks:
1284       # Lock in exclusive mode
1285       self.assert_(self.ls.acquire(block, shared=0))
1286
1287       def _AcquireOne():
1288         # Try to get the same lock again with a timeout (should never succeed)
1289         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1290         if acquired:
1291           self.done.put("acquired")
1292           self.ls.release()
1293         else:
1294           self.assert_(acquired is None)
1295           self.assertFalse(self.ls.list_owned())
1296           self.assertFalse(self.ls.is_owned())
1297           self.done.put("not acquired")
1298
1299       self._addThread(target=_AcquireOne)
1300
1301       # Wait for timeout in thread to expire
1302       self._waitThreads()
1303
1304       # Release exclusive lock again
1305       self.ls.release()
1306
1307       self.assertEqual(self.done.get_nowait(), "not acquired")
1308       self.assertRaises(Queue.Empty, self.done.get_nowait)
1309
1310   @_Repeat
1311   def testDelayedAndExpiringLockAcquire(self):
1312     self._setUpLS()
1313     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1314
1315     for expire in (False, True):
1316       names = sorted(self.ls._names())
1317       self.assertEqual(len(names), 8)
1318
1319       lock_ev = dict([(i, threading.Event()) for i in names])
1320
1321       # Lock all in exclusive mode
1322       self.assert_(self.ls.acquire(names, shared=0))
1323
1324       if expire:
1325         # We'll wait at least 300ms per lock
1326         lockwait = len(names) * [0.3]
1327
1328         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1329         # this gives us up to 2.4s to fail.
1330         lockall_timeout = 0.4
1331       else:
1332         # This should finish rather quickly
1333         lockwait = None
1334         lockall_timeout = len(names) * 5.0
1335
1336       def _LockAll():
1337         def acquire_notification(name):
1338           if not expire:
1339             self.done.put("getting %s" % name)
1340
1341           # Kick next lock
1342           lock_ev[name].set()
1343
1344         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1345                            test_notify=acquire_notification):
1346           self.done.put("got all")
1347           self.ls.release()
1348         else:
1349           self.done.put("timeout on all")
1350
1351         # Notify all locks
1352         for ev in lock_ev.values():
1353           ev.set()
1354
1355       t = self._addThread(target=_LockAll)
1356
1357       for idx, name in enumerate(names):
1358         # Wait for actual acquire on this lock to start
1359         lock_ev[name].wait(10.0)
1360
1361         if expire and t.isAlive():
1362           # Wait some time after getting the notification to make sure the lock
1363           # acquire will expire
1364           SafeSleep(lockwait[idx])
1365
1366         self.ls.release(names=name)
1367
1368       self.assertFalse(self.ls.list_owned())
1369
1370       self._waitThreads()
1371
1372       if expire:
1373         # Not checking which locks were actually acquired. Doing so would be
1374         # too timing-dependant.
1375         self.assertEqual(self.done.get_nowait(), "timeout on all")
1376       else:
1377         for i in names:
1378           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1379         self.assertEqual(self.done.get_nowait(), "got all")
1380       self.assertRaises(Queue.Empty, self.done.get_nowait)
1381
1382   @_Repeat
1383   def testConcurrentRemove(self):
1384     self.ls.add('four')
1385     self.ls.acquire(['one', 'two', 'four'])
1386     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1387     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1388     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1389     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1390     self.assertRaises(Queue.Empty, self.done.get_nowait)
1391     self.ls.remove('one')
1392     self.ls.release()
1393     self._waitThreads()
1394     for i in range(4):
1395       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1396     self.ls.add(['five', 'six'], acquired=1)
1397     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1398     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1399     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1400     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1401     self.ls.remove('five')
1402     self.ls.release()
1403     self._waitThreads()
1404     for i in range(4):
1405       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1406     self.ls.acquire(['three', 'four'])
1407     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1408     self.assertRaises(Queue.Empty, self.done.get_nowait)
1409     self.ls.remove('four')
1410     self._waitThreads()
1411     self.assertEqual(self.done.get_nowait(), ['six'])
1412     self._addThread(target=self._doRemoveSet, args=(['two']))
1413     self._waitThreads()
1414     self.assertEqual(self.done.get_nowait(), ['two'])
1415     self.ls.release()
1416     # reset lockset
1417     self._setUpLS()
1418
1419   @_Repeat
1420   def testConcurrentSharedSetLock(self):
1421     # share the set-lock...
1422     self.ls.acquire(None, shared=1)
1423     # ...another thread can share it too
1424     self._addThread(target=self._doLockSet, args=(None, 1))
1425     self._waitThreads()
1426     self.assertEqual(self.done.get_nowait(), 'DONE')
1427     # ...or just share some elements
1428     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1429     self._waitThreads()
1430     self.assertEqual(self.done.get_nowait(), 'DONE')
1431     # ...but not add new ones or remove any
1432     t = self._addThread(target=self._doAddSet, args=(['nine']))
1433     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1434     self.assertRaises(Queue.Empty, self.done.get_nowait)
1435     # this just releases the set-lock
1436     self.ls.release([])
1437     t.join(60)
1438     self.assertEqual(self.done.get_nowait(), 'DONE')
1439     # release the lock on the actual elements so remove() can proceed too
1440     self.ls.release()
1441     self._waitThreads()
1442     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1443     # reset lockset
1444     self._setUpLS()
1445
1446   @_Repeat
1447   def testConcurrentExclusiveSetLock(self):
1448     # acquire the set-lock...
1449     self.ls.acquire(None, shared=0)
1450     # ...no one can do anything else
1451     self._addThread(target=self._doLockSet, args=(None, 1))
1452     self._addThread(target=self._doLockSet, args=(None, 0))
1453     self._addThread(target=self._doLockSet, args=(['three'], 0))
1454     self._addThread(target=self._doLockSet, args=(['two'], 1))
1455     self._addThread(target=self._doAddSet, args=(['nine']))
1456     self.assertRaises(Queue.Empty, self.done.get_nowait)
1457     self.ls.release()
1458     self._waitThreads()
1459     for _ in range(5):
1460       self.assertEqual(self.done.get(True, 1), 'DONE')
1461     # cleanup
1462     self._setUpLS()
1463
1464   @_Repeat
1465   def testConcurrentSetLockAdd(self):
1466     self.ls.acquire('one')
1467     # Another thread wants the whole SetLock
1468     self._addThread(target=self._doLockSet, args=(None, 0))
1469     self._addThread(target=self._doLockSet, args=(None, 1))
1470     self.assertRaises(Queue.Empty, self.done.get_nowait)
1471     self.assertRaises(AssertionError, self.ls.add, 'four')
1472     self.ls.release()
1473     self._waitThreads()
1474     self.assertEqual(self.done.get_nowait(), 'DONE')
1475     self.assertEqual(self.done.get_nowait(), 'DONE')
1476     self.ls.acquire(None)
1477     self._addThread(target=self._doLockSet, args=(None, 0))
1478     self._addThread(target=self._doLockSet, args=(None, 1))
1479     self.assertRaises(Queue.Empty, self.done.get_nowait)
1480     self.ls.add('four')
1481     self.ls.add('five', acquired=1)
1482     self.ls.add('six', acquired=1, shared=1)
1483     self.assertEquals(self.ls.list_owned(),
1484       set(['one', 'two', 'three', 'five', 'six']))
1485     self.assertEquals(self.ls.is_owned(), True)
1486     self.assertEquals(self.ls._names(),
1487       set(['one', 'two', 'three', 'four', 'five', 'six']))
1488     self.ls.release()
1489     self._waitThreads()
1490     self.assertEqual(self.done.get_nowait(), 'DONE')
1491     self.assertEqual(self.done.get_nowait(), 'DONE')
1492     self._setUpLS()
1493
1494   @_Repeat
1495   def testEmptyLockSet(self):
1496     # get the set-lock
1497     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1498     # now empty it...
1499     self.ls.remove(['one', 'two', 'three'])
1500     # and adds/locks by another thread still wait
1501     self._addThread(target=self._doAddSet, args=(['nine']))
1502     self._addThread(target=self._doLockSet, args=(None, 1))
1503     self._addThread(target=self._doLockSet, args=(None, 0))
1504     self.assertRaises(Queue.Empty, self.done.get_nowait)
1505     self.ls.release()
1506     self._waitThreads()
1507     for _ in range(3):
1508       self.assertEqual(self.done.get_nowait(), 'DONE')
1509     # empty it again...
1510     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1511     # now share it...
1512     self.assertEqual(self.ls.acquire(None, shared=1), set())
1513     # other sharers can go, adds still wait
1514     self._addThread(target=self._doLockSet, args=(None, 1))
1515     self._waitThreads()
1516     self.assertEqual(self.done.get_nowait(), 'DONE')
1517     self._addThread(target=self._doAddSet, args=(['nine']))
1518     self.assertRaises(Queue.Empty, self.done.get_nowait)
1519     self.ls.release()
1520     self._waitThreads()
1521     self.assertEqual(self.done.get_nowait(), 'DONE')
1522     self._setUpLS()
1523
1524   def testAcquireWithNamesDowngrade(self):
1525     self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1526     self.assertTrue(self.ls.is_owned())
1527     self.assertFalse(self.ls._get_lock().is_owned())
1528     self.ls.release()
1529     self.assertFalse(self.ls.is_owned())
1530     self.assertFalse(self.ls._get_lock().is_owned())
1531     # Can't downgrade after releasing
1532     self.assertRaises(AssertionError, self.ls.downgrade, "two")
1533
1534   def testDowngrade(self):
1535     # Not owning anything, must raise an exception
1536     self.assertFalse(self.ls.is_owned())
1537     self.assertRaises(AssertionError, self.ls.downgrade)
1538
1539     self.assertFalse(compat.any(i.is_owned()
1540                                 for i in self.ls._get_lockdict().values()))
1541     self.assertFalse(self.ls.check_owned(self.ls._names()))
1542     for name in self.ls._names():
1543       self.assertFalse(self.ls.check_owned(name))
1544
1545     self.assertEquals(self.ls.acquire(None, shared=0),
1546                       set(["one", "two", "three"]))
1547     self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1548
1549     self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1550     for name in self.ls._names():
1551       self.assertTrue(self.ls.check_owned(name))
1552       self.assertTrue(self.ls.check_owned(name, shared=0))
1553       self.assertFalse(self.ls.check_owned(name, shared=1))
1554
1555     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1556     self.assertTrue(compat.all(i.is_owned(shared=0)
1557                                for i in self.ls._get_lockdict().values()))
1558
1559     # Start downgrading locks
1560     self.assertTrue(self.ls.downgrade(names=["one"]))
1561     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1562     self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1563                                for name, lock in
1564                                  self.ls._get_lockdict().items()))
1565
1566     self.assertFalse(self.ls.check_owned("one", shared=0))
1567     self.assertTrue(self.ls.check_owned("one", shared=1))
1568     self.assertTrue(self.ls.check_owned("two", shared=0))
1569     self.assertTrue(self.ls.check_owned("three", shared=0))
1570
1571     # Downgrade second lock
1572     self.assertTrue(self.ls.downgrade(names="two"))
1573     self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1574     should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1575     self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1576                                for name, lock in
1577                                  self.ls._get_lockdict().items()))
1578
1579     self.assertFalse(self.ls.check_owned("one", shared=0))
1580     self.assertTrue(self.ls.check_owned("one", shared=1))
1581     self.assertFalse(self.ls.check_owned("two", shared=0))
1582     self.assertTrue(self.ls.check_owned("two", shared=1))
1583     self.assertTrue(self.ls.check_owned("three", shared=0))
1584
1585     # Downgrading the last exclusive lock to shared must downgrade the
1586     # lockset-internal lock too
1587     self.assertTrue(self.ls.downgrade(names="three"))
1588     self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1589     self.assertTrue(compat.all(i.is_owned(shared=1)
1590                                for i in self.ls._get_lockdict().values()))
1591
1592     # Verify owned locks
1593     for name in self.ls._names():
1594       self.assertTrue(self.ls.check_owned(name, shared=1))
1595
1596     # Downgrading a shared lock must be a no-op
1597     self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1598     self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1599     self.assertTrue(compat.all(i.is_owned(shared=1)
1600                                for i in self.ls._get_lockdict().values()))
1601
1602     self.ls.release()
1603
1604   def testPriority(self):
1605     def _Acquire(prev, next, name, priority, success_fn):
1606       prev.wait()
1607       self.assert_(self.ls.acquire(name, shared=0,
1608                                    priority=priority,
1609                                    test_notify=lambda _: next.set()))
1610       try:
1611         success_fn()
1612       finally:
1613         self.ls.release()
1614
1615     # Get all in exclusive mode
1616     self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1617
1618     done_two = Queue.Queue(0)
1619
1620     first = threading.Event()
1621     prev = first
1622
1623     acquires = [("one", prio, self.done) for prio in range(1, 33)]
1624     acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1625
1626     # Use a deterministic random generator
1627     random.Random(741).shuffle(acquires)
1628
1629     for (name, prio, done) in acquires:
1630       ev = threading.Event()
1631       self._addThread(target=_Acquire,
1632                       args=(prev, ev, name, prio,
1633                             compat.partial(done.put, "Prio%s" % prio)))
1634       prev = ev
1635
1636     # Start acquires
1637     first.set()
1638
1639     # Wait for last acquire to start
1640     prev.wait()
1641
1642     # Let threads acquire locks
1643     self.ls.release()
1644
1645     # Wait for threads to finish
1646     self._waitThreads()
1647
1648     for i in range(1, 33):
1649       self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1650       self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1651
1652     self.assertRaises(Queue.Empty, self.done.get_nowait)
1653     self.assertRaises(Queue.Empty, done_two.get_nowait)
1654
1655
1656 class TestGanetiLockManager(_ThreadedTestCase):
1657
1658   def setUp(self):
1659     _ThreadedTestCase.setUp(self)
1660     self.nodes=['n1', 'n2']
1661     self.nodegroups=['g1', 'g2']
1662     self.instances=['i1', 'i2', 'i3']
1663     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1664                                         self.instances)
1665
1666   def tearDown(self):
1667     # Don't try this at home...
1668     locking.GanetiLockManager._instance = None
1669
1670   def testLockingConstants(self):
1671     # The locking library internally cheats by assuming its constants have some
1672     # relationships with each other. Check those hold true.
1673     # This relationship is also used in the Processor to recursively acquire
1674     # the right locks. Again, please don't break it.
1675     for i in range(len(locking.LEVELS)):
1676       self.assertEqual(i, locking.LEVELS[i])
1677
1678   def testDoubleGLFails(self):
1679     self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1680
1681   def testLockNames(self):
1682     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1683     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1684     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1685                      set(self.nodegroups))
1686     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1687                      set(self.instances))
1688
1689   def testInitAndResources(self):
1690     locking.GanetiLockManager._instance = None
1691     self.GL = locking.GanetiLockManager([], [], [])
1692     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1693     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1694     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1695     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1696
1697     locking.GanetiLockManager._instance = None
1698     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1699     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1700     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1701     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1702                                     set(self.nodegroups))
1703     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1704
1705     locking.GanetiLockManager._instance = None
1706     self.GL = locking.GanetiLockManager([], [], self.instances)
1707     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1708     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1709     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1710     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1711                      set(self.instances))
1712
1713   def testAcquireRelease(self):
1714     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1715     self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1716     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1717     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1718     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1719     self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
1720                                         shared=1))
1721     self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
1722     self.GL.release(locking.LEVEL_NODE, ['n2'])
1723     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1']))
1724     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1725     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1726     self.GL.release(locking.LEVEL_NODE)
1727     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
1728     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1729     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1730     self.GL.release(locking.LEVEL_NODEGROUP)
1731     self.GL.release(locking.LEVEL_INSTANCE)
1732     self.assertRaises(errors.LockError, self.GL.acquire,
1733                       locking.LEVEL_INSTANCE, ['i5'])
1734     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1735     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1736
1737   def testAcquireWholeSets(self):
1738     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1739     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1740                       set(self.instances))
1741     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1742                       set(self.instances))
1743     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1744                       set(self.nodegroups))
1745     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
1746                       set(self.nodegroups))
1747     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1748                       set(self.nodes))
1749     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1750                       set(self.nodes))
1751     self.GL.release(locking.LEVEL_NODE)
1752     self.GL.release(locking.LEVEL_NODEGROUP)
1753     self.GL.release(locking.LEVEL_INSTANCE)
1754     self.GL.release(locking.LEVEL_CLUSTER)
1755
1756   def testAcquireWholeAndPartial(self):
1757     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1758     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1759                       set(self.instances))
1760     self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1761                       set(self.instances))
1762     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1763                       set(['n2']))
1764     self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1765                       set(['n2']))
1766     self.GL.release(locking.LEVEL_NODE)
1767     self.GL.release(locking.LEVEL_INSTANCE)
1768     self.GL.release(locking.LEVEL_CLUSTER)
1769
1770   def testBGLDependency(self):
1771     self.assertRaises(AssertionError, self.GL.acquire,
1772                       locking.LEVEL_NODE, ['n1', 'n2'])
1773     self.assertRaises(AssertionError, self.GL.acquire,
1774                       locking.LEVEL_INSTANCE, ['i3'])
1775     self.assertRaises(AssertionError, self.GL.acquire,
1776                       locking.LEVEL_NODEGROUP, ['g1'])
1777     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1778     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1779     self.assertRaises(AssertionError, self.GL.release,
1780                       locking.LEVEL_CLUSTER, ['BGL'])
1781     self.assertRaises(AssertionError, self.GL.release,
1782                       locking.LEVEL_CLUSTER)
1783     self.GL.release(locking.LEVEL_NODE)
1784     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1785     self.assertRaises(AssertionError, self.GL.release,
1786                       locking.LEVEL_CLUSTER, ['BGL'])
1787     self.assertRaises(AssertionError, self.GL.release,
1788                       locking.LEVEL_CLUSTER)
1789     self.GL.release(locking.LEVEL_INSTANCE)
1790     self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1791     self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1792     self.assertRaises(AssertionError, self.GL.release,
1793                       locking.LEVEL_CLUSTER, ['BGL'])
1794     self.assertRaises(AssertionError, self.GL.release,
1795                       locking.LEVEL_CLUSTER)
1796     self.GL.release(locking.LEVEL_NODEGROUP)
1797     self.GL.release(locking.LEVEL_CLUSTER)
1798
1799   def testWrongOrder(self):
1800     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1801     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1802     self.assertRaises(AssertionError, self.GL.acquire,
1803                       locking.LEVEL_NODE, ['n1'])
1804     self.assertRaises(AssertionError, self.GL.acquire,
1805                       locking.LEVEL_NODEGROUP, ['g1'])
1806     self.assertRaises(AssertionError, self.GL.acquire,
1807                       locking.LEVEL_INSTANCE, ['i2'])
1808
1809   def testModifiableLevels(self):
1810     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1811                       ['BGL2'])
1812     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1813     self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1814     self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1815     self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1816     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1817     self.GL.add(locking.LEVEL_NODE, ['n3'])
1818     self.GL.remove(locking.LEVEL_NODE, ['n1'])
1819     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1820     self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1821     self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1822     self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1823     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1824     self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1825                       ['BGL2'])
1826
1827   # Helper function to run as a thread that shared the BGL and then acquires
1828   # some locks at another level.
1829   def _doLock(self, level, names, shared):
1830     try:
1831       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1832       self.GL.acquire(level, names, shared=shared)
1833       self.done.put('DONE')
1834       self.GL.release(level)
1835       self.GL.release(locking.LEVEL_CLUSTER)
1836     except errors.LockError:
1837       self.done.put('ERR')
1838
1839   @_Repeat
1840   def testConcurrency(self):
1841     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1842     self._addThread(target=self._doLock,
1843                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1844     self._waitThreads()
1845     self.assertEqual(self.done.get_nowait(), 'DONE')
1846     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1847     self._addThread(target=self._doLock,
1848                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1849     self._waitThreads()
1850     self.assertEqual(self.done.get_nowait(), 'DONE')
1851     self._addThread(target=self._doLock,
1852                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1853     self.assertRaises(Queue.Empty, self.done.get_nowait)
1854     self.GL.release(locking.LEVEL_INSTANCE)
1855     self._waitThreads()
1856     self.assertEqual(self.done.get_nowait(), 'DONE')
1857     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1858     self._addThread(target=self._doLock,
1859                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1860     self._waitThreads()
1861     self.assertEqual(self.done.get_nowait(), 'DONE')
1862     self._addThread(target=self._doLock,
1863                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1864     self.assertRaises(Queue.Empty, self.done.get_nowait)
1865     self.GL.release(locking.LEVEL_INSTANCE)
1866     self._waitThreads()
1867     self.assertEqual(self.done.get(True, 1), 'DONE')
1868     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1869
1870
1871 class TestLockMonitor(_ThreadedTestCase):
1872   def setUp(self):
1873     _ThreadedTestCase.setUp(self)
1874     self.lm = locking.LockMonitor()
1875
1876   def testSingleThread(self):
1877     locks = []
1878
1879     for i in range(100):
1880       name = "TestLock%s" % i
1881       locks.append(locking.SharedLock(name, monitor=self.lm))
1882
1883     self.assertEqual(len(self.lm._locks), len(locks))
1884     result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1885     self.assertEqual(len(result.fields), 1)
1886     self.assertEqual(len(result.data), 100)
1887
1888     # Delete all locks
1889     del locks[:]
1890
1891     # The garbage collector might needs some time
1892     def _CheckLocks():
1893       if self.lm._locks:
1894         raise utils.RetryAgain()
1895
1896     utils.Retry(_CheckLocks, 0.1, 30.0)
1897
1898     self.assertFalse(self.lm._locks)
1899
1900   def testMultiThread(self):
1901     locks = []
1902
1903     def _CreateLock(prev, next, name):
1904       prev.wait()
1905       locks.append(locking.SharedLock(name, monitor=self.lm))
1906       if next:
1907         next.set()
1908
1909     expnames = []
1910
1911     first = threading.Event()
1912     prev = first
1913
1914     # Use a deterministic random generator
1915     for i in random.Random(4263).sample(range(100), 33):
1916       name = "MtTestLock%s" % i
1917       expnames.append(name)
1918
1919       ev = threading.Event()
1920       self._addThread(target=_CreateLock, args=(prev, ev, name))
1921       prev = ev
1922
1923     # Add locks
1924     first.set()
1925     self._waitThreads()
1926
1927     # Check order in which locks were added
1928     self.assertEqual([i.name for i in locks], expnames)
1929
1930     # Check query result
1931     result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1932     self.assert_(isinstance(result, dict))
1933     response = objects.QueryResponse.FromDict(result)
1934     self.assertEqual(response.data,
1935                      [[(constants.RS_NORMAL, name),
1936                        (constants.RS_NORMAL, None),
1937                        (constants.RS_NORMAL, None),
1938                        (constants.RS_NORMAL, [])]
1939                       for name in utils.NiceSort(expnames)])
1940     self.assertEqual(len(response.fields), 4)
1941     self.assertEqual(["name", "mode", "owner", "pending"],
1942                      [fdef.name for fdef in response.fields])
1943
1944     # Test exclusive acquire
1945     for tlock in locks[::4]:
1946       tlock.acquire(shared=0)
1947       try:
1948         def _GetExpResult(name):
1949           if tlock.name == name:
1950             return [(constants.RS_NORMAL, name),
1951                     (constants.RS_NORMAL, "exclusive"),
1952                     (constants.RS_NORMAL,
1953                      [threading.currentThread().getName()]),
1954                     (constants.RS_NORMAL, [])]
1955           return [(constants.RS_NORMAL, name),
1956                   (constants.RS_NORMAL, None),
1957                   (constants.RS_NORMAL, None),
1958                   (constants.RS_NORMAL, [])]
1959
1960         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1961         self.assertEqual(objects.QueryResponse.FromDict(result).data,
1962                          [_GetExpResult(name)
1963                           for name in utils.NiceSort(expnames)])
1964       finally:
1965         tlock.release()
1966
1967     # Test shared acquire
1968     def _Acquire(lock, shared, ev, notify):
1969       lock.acquire(shared=shared)
1970       try:
1971         notify.set()
1972         ev.wait()
1973       finally:
1974         lock.release()
1975
1976     for tlock1 in locks[::11]:
1977       for tlock2 in locks[::-15]:
1978         if tlock2 == tlock1:
1979           # Avoid deadlocks
1980           continue
1981
1982         for tlock3 in locks[::10]:
1983           if tlock3 in (tlock2, tlock1):
1984             # Avoid deadlocks
1985             continue
1986
1987           releaseev = threading.Event()
1988
1989           # Acquire locks
1990           acquireev = []
1991           tthreads1 = []
1992           for i in range(3):
1993             ev = threading.Event()
1994             tthreads1.append(self._addThread(target=_Acquire,
1995                                              args=(tlock1, 1, releaseev, ev)))
1996             acquireev.append(ev)
1997
1998           ev = threading.Event()
1999           tthread2 = self._addThread(target=_Acquire,
2000                                      args=(tlock2, 1, releaseev, ev))
2001           acquireev.append(ev)
2002
2003           ev = threading.Event()
2004           tthread3 = self._addThread(target=_Acquire,
2005                                      args=(tlock3, 0, releaseev, ev))
2006           acquireev.append(ev)
2007
2008           # Wait for all locks to be acquired
2009           for i in acquireev:
2010             i.wait()
2011
2012           # Check query result
2013           result = self.lm.QueryLocks(["name", "mode", "owner"])
2014           response = objects.QueryResponse.FromDict(result)
2015           for (name, mode, owner) in response.data:
2016             (name_status, name_value) = name
2017             (owner_status, owner_value) = owner
2018
2019             self.assertEqual(name_status, constants.RS_NORMAL)
2020             self.assertEqual(owner_status, constants.RS_NORMAL)
2021
2022             if name_value == tlock1.name:
2023               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2024               self.assertEqual(set(owner_value),
2025                                set(i.getName() for i in tthreads1))
2026               continue
2027
2028             if name_value == tlock2.name:
2029               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2030               self.assertEqual(owner_value, [tthread2.getName()])
2031               continue
2032
2033             if name_value == tlock3.name:
2034               self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2035               self.assertEqual(owner_value, [tthread3.getName()])
2036               continue
2037
2038             self.assert_(name_value in expnames)
2039             self.assertEqual(mode, (constants.RS_NORMAL, None))
2040             self.assert_(owner_value is None)
2041
2042           # Release locks again
2043           releaseev.set()
2044
2045           self._waitThreads()
2046
2047           result = self.lm.QueryLocks(["name", "mode", "owner"])
2048           self.assertEqual(objects.QueryResponse.FromDict(result).data,
2049                            [[(constants.RS_NORMAL, name),
2050                              (constants.RS_NORMAL, None),
2051                              (constants.RS_NORMAL, None)]
2052                             for name in utils.NiceSort(expnames)])
2053
2054   def testDelete(self):
2055     lock = locking.SharedLock("TestLock", monitor=self.lm)
2056
2057     self.assertEqual(len(self.lm._locks), 1)
2058     result = self.lm.QueryLocks(["name", "mode", "owner"])
2059     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2060                      [[(constants.RS_NORMAL, lock.name),
2061                        (constants.RS_NORMAL, None),
2062                        (constants.RS_NORMAL, None)]])
2063
2064     lock.delete()
2065
2066     result = self.lm.QueryLocks(["name", "mode", "owner"])
2067     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2068                      [[(constants.RS_NORMAL, lock.name),
2069                        (constants.RS_NORMAL, "deleted"),
2070                        (constants.RS_NORMAL, None)]])
2071     self.assertEqual(len(self.lm._locks), 1)
2072
2073   def testPending(self):
2074     def _Acquire(lock, shared, prev, next):
2075       prev.wait()
2076
2077       lock.acquire(shared=shared, test_notify=next.set)
2078       try:
2079         pass
2080       finally:
2081         lock.release()
2082
2083     lock = locking.SharedLock("ExcLock", monitor=self.lm)
2084
2085     for shared in [0, 1]:
2086       lock.acquire()
2087       try:
2088         self.assertEqual(len(self.lm._locks), 1)
2089         result = self.lm.QueryLocks(["name", "mode", "owner"])
2090         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2091                          [[(constants.RS_NORMAL, lock.name),
2092                            (constants.RS_NORMAL, "exclusive"),
2093                            (constants.RS_NORMAL,
2094                             [threading.currentThread().getName()])]])
2095
2096         threads = []
2097
2098         first = threading.Event()
2099         prev = first
2100
2101         for i in range(5):
2102           ev = threading.Event()
2103           threads.append(self._addThread(target=_Acquire,
2104                                           args=(lock, shared, prev, ev)))
2105           prev = ev
2106
2107         # Start acquires
2108         first.set()
2109
2110         # Wait for last acquire to start waiting
2111         prev.wait()
2112
2113         # NOTE: This works only because QueryLocks will acquire the
2114         # lock-internal lock again and won't be able to get the information
2115         # until it has the lock. By then the acquire should be registered in
2116         # SharedLock.__pending (otherwise it's a bug).
2117
2118         # All acquires are waiting now
2119         if shared:
2120           pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2121         else:
2122           pending = [("exclusive", [t.getName()]) for t in threads]
2123
2124         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2125         self.assertEqual(objects.QueryResponse.FromDict(result).data,
2126                          [[(constants.RS_NORMAL, lock.name),
2127                            (constants.RS_NORMAL, "exclusive"),
2128                            (constants.RS_NORMAL,
2129                             [threading.currentThread().getName()]),
2130                            (constants.RS_NORMAL, pending)]])
2131
2132         self.assertEqual(len(self.lm._locks), 1)
2133       finally:
2134         lock.release()
2135
2136       self._waitThreads()
2137
2138       # No pending acquires
2139       result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2140       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2141                        [[(constants.RS_NORMAL, lock.name),
2142                          (constants.RS_NORMAL, None),
2143                          (constants.RS_NORMAL, None),
2144                          (constants.RS_NORMAL, [])]])
2145
2146       self.assertEqual(len(self.lm._locks), 1)
2147
2148   def testDeleteAndRecreate(self):
2149     lname = "TestLock101923193"
2150
2151     # Create some locks with the same name and keep all references
2152     locks = [locking.SharedLock(lname, monitor=self.lm)
2153              for _ in range(5)]
2154
2155     self.assertEqual(len(self.lm._locks), len(locks))
2156
2157     result = self.lm.QueryLocks(["name", "mode", "owner"])
2158     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2159                      [[(constants.RS_NORMAL, lname),
2160                        (constants.RS_NORMAL, None),
2161                        (constants.RS_NORMAL, None)]] * 5)
2162
2163     locks[2].delete()
2164
2165     # Check information order
2166     result = self.lm.QueryLocks(["name", "mode", "owner"])
2167     self.assertEqual(objects.QueryResponse.FromDict(result).data,
2168                      [[(constants.RS_NORMAL, lname),
2169                        (constants.RS_NORMAL, None),
2170                        (constants.RS_NORMAL, None)]] * 2 +
2171                      [[(constants.RS_NORMAL, lname),
2172                        (constants.RS_NORMAL, "deleted"),
2173                        (constants.RS_NORMAL, None)]] +
2174                      [[(constants.RS_NORMAL, lname),
2175                        (constants.RS_NORMAL, None),
2176                        (constants.RS_NORMAL, None)]] * 2)
2177
2178     locks[1].acquire(shared=0)
2179
2180     last_status = [
2181       [(constants.RS_NORMAL, lname),
2182        (constants.RS_NORMAL, None),
2183        (constants.RS_NORMAL, None)],
2184       [(constants.RS_NORMAL, lname),
2185        (constants.RS_NORMAL, "exclusive"),
2186        (constants.RS_NORMAL, [threading.currentThread().getName()])],
2187       [(constants.RS_NORMAL, lname),
2188        (constants.RS_NORMAL, "deleted"),
2189        (constants.RS_NORMAL, None)],
2190       [(constants.RS_NORMAL, lname),
2191        (constants.RS_NORMAL, None),
2192        (constants.RS_NORMAL, None)],
2193       [(constants.RS_NORMAL, lname),
2194        (constants.RS_NORMAL, None),
2195        (constants.RS_NORMAL, None)],
2196       ]
2197
2198     # Check information order
2199     result = self.lm.QueryLocks(["name", "mode", "owner"])
2200     self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2201
2202     self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2203     self.assertEqual(len(self.lm._locks), len(locks))
2204
2205     # Check lock deletion
2206     for idx in range(len(locks)):
2207       del locks[0]
2208       assert gc.isenabled()
2209       gc.collect()
2210       self.assertEqual(len(self.lm._locks), len(locks))
2211       result = self.lm.QueryLocks(["name", "mode", "owner"])
2212       self.assertEqual(objects.QueryResponse.FromDict(result).data,
2213                        last_status[idx + 1:])
2214
2215     # All locks should have been deleted
2216     assert not locks
2217     self.assertFalse(self.lm._locks)
2218
2219     result = self.lm.QueryLocks(["name", "mode", "owner"])
2220     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2221
2222   class _FakeLock:
2223     def __init__(self):
2224       self._info = []
2225
2226     def AddResult(self, *args):
2227       self._info.append(args)
2228
2229     def CountPending(self):
2230       return len(self._info)
2231
2232     def GetLockInfo(self, requested):
2233       (exp_requested, result) = self._info.pop(0)
2234
2235       if exp_requested != requested:
2236         raise Exception("Requested information (%s) does not match"
2237                         " expectations (%s)" % (requested, exp_requested))
2238
2239       return result
2240
2241   def testMultipleResults(self):
2242     fl1 = self._FakeLock()
2243     fl2 = self._FakeLock()
2244
2245     self.lm.RegisterLock(fl1)
2246     self.lm.RegisterLock(fl2)
2247
2248     # Empty information
2249     for i in [fl1, fl2]:
2250       i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2251     result = self.lm.QueryLocks(["name", "mode", "owner"])
2252     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2253     for i in [fl1, fl2]:
2254       self.assertEqual(i.CountPending(), 0)
2255
2256     # Check ordering
2257     for fn in [lambda x: x, reversed, sorted]:
2258       fl1.AddResult(set(), list(fn([
2259         ("aaa", None, None, None),
2260         ("bbb", None, None, None),
2261         ])))
2262       fl2.AddResult(set(), [])
2263       result = self.lm.QueryLocks(["name"])
2264       self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2265         [(constants.RS_NORMAL, "aaa")],
2266         [(constants.RS_NORMAL, "bbb")],
2267         ])
2268       for i in [fl1, fl2]:
2269         self.assertEqual(i.CountPending(), 0)
2270
2271       for fn2 in [lambda x: x, reversed, sorted]:
2272         fl1.AddResult(set([query.LQ_MODE]), list(fn([
2273           # Same name, but different information
2274           ("aaa", "mode0", None, None),
2275           ("aaa", "mode1", None, None),
2276           ("aaa", "mode2", None, None),
2277           ("aaa", "mode3", None, None),
2278           ])))
2279         fl2.AddResult(set([query.LQ_MODE]), [
2280           ("zzz", "end", None, None),
2281           ("000", "start", None, None),
2282           ] + list(fn2([
2283           ("aaa", "b200", None, None),
2284           ("aaa", "b300", None, None),
2285           ])))
2286         result = self.lm.QueryLocks(["name", "mode"])
2287         self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2288           [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2289           ] + list(fn([
2290           # Name is the same, so order must be equal to incoming order
2291           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2292           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2293           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2294           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2295           ])) + list(fn2([
2296           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2297           [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2298           ])) + [
2299           [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2300           ])
2301         for i in [fl1, fl2]:
2302           self.assertEqual(i.CountPending(), 0)
2303
2304
2305 if __name__ == '__main__':
2306   testutils.GanetiTestProgram()