cmdlib: Sort list of fields for QueryFields
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31 import itertools
32
33 from ganeti import locking
34 from ganeti import errors
35 from ganeti import utils
36 from ganeti import compat
37
38 import testutils
39
40
41 # This is used to test the ssynchronize decorator.
42 # Since it's passed as input to a decorator it must be declared as a global.
43 _decoratorlock = locking.SharedLock("decorator lock")
44
45 #: List for looping tests
46 ITERATIONS = range(8)
47
48
49 def _Repeat(fn):
50   """Decorator for executing a function many times"""
51   def wrapper(*args, **kwargs):
52     for i in ITERATIONS:
53       fn(*args, **kwargs)
54   return wrapper
55
56
57 def SafeSleep(duration):
58   start = time.time()
59   while True:
60     delay = start + duration - time.time()
61     if delay <= 0.0:
62       break
63     time.sleep(delay)
64
65
66 class _ThreadedTestCase(unittest.TestCase):
67   """Test class that supports adding/waiting on threads"""
68   def setUp(self):
69     unittest.TestCase.setUp(self)
70     self.done = Queue.Queue(0)
71     self.threads = []
72
73   def _addThread(self, *args, **kwargs):
74     """Create and remember a new thread"""
75     t = threading.Thread(*args, **kwargs)
76     self.threads.append(t)
77     t.start()
78     return t
79
80   def _waitThreads(self):
81     """Wait for all our threads to finish"""
82     for t in self.threads:
83       t.join(60)
84       self.failIf(t.isAlive())
85     self.threads = []
86
87
88 class _ConditionTestCase(_ThreadedTestCase):
89   """Common test case for conditions"""
90
91   def setUp(self, cls):
92     _ThreadedTestCase.setUp(self)
93     self.lock = threading.Lock()
94     self.cond = cls(self.lock)
95
96   def _testAcquireRelease(self):
97     self.assertFalse(self.cond._is_owned())
98     self.assertRaises(RuntimeError, self.cond.wait)
99     self.assertRaises(RuntimeError, self.cond.notifyAll)
100
101     self.cond.acquire()
102     self.assert_(self.cond._is_owned())
103     self.cond.notifyAll()
104     self.assert_(self.cond._is_owned())
105     self.cond.release()
106
107     self.assertFalse(self.cond._is_owned())
108     self.assertRaises(RuntimeError, self.cond.wait)
109     self.assertRaises(RuntimeError, self.cond.notifyAll)
110
111   def _testNotification(self):
112     def _NotifyAll():
113       self.done.put("NE")
114       self.cond.acquire()
115       self.done.put("NA")
116       self.cond.notifyAll()
117       self.done.put("NN")
118       self.cond.release()
119
120     self.cond.acquire()
121     self._addThread(target=_NotifyAll)
122     self.assertEqual(self.done.get(True, 1), "NE")
123     self.assertRaises(Queue.Empty, self.done.get_nowait)
124     self.cond.wait()
125     self.assertEqual(self.done.get(True, 1), "NA")
126     self.assertEqual(self.done.get(True, 1), "NN")
127     self.assert_(self.cond._is_owned())
128     self.cond.release()
129     self.assertFalse(self.cond._is_owned())
130
131
132 class TestSingleNotifyPipeCondition(_ConditionTestCase):
133   """SingleNotifyPipeCondition tests"""
134
135   def setUp(self):
136     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
137
138   def testAcquireRelease(self):
139     self._testAcquireRelease()
140
141   def testNotification(self):
142     self._testNotification()
143
144   def testWaitReuse(self):
145     self.cond.acquire()
146     self.cond.wait(0)
147     self.cond.wait(0.1)
148     self.cond.release()
149
150   def testNoNotifyReuse(self):
151     self.cond.acquire()
152     self.cond.notifyAll()
153     self.assertRaises(RuntimeError, self.cond.wait)
154     self.assertRaises(RuntimeError, self.cond.notifyAll)
155     self.cond.release()
156
157
158 class TestPipeCondition(_ConditionTestCase):
159   """PipeCondition tests"""
160
161   def setUp(self):
162     _ConditionTestCase.setUp(self, locking.PipeCondition)
163
164   def testAcquireRelease(self):
165     self._testAcquireRelease()
166
167   def testNotification(self):
168     self._testNotification()
169
170   def _TestWait(self, fn):
171     threads = [
172       self._addThread(target=fn),
173       self._addThread(target=fn),
174       self._addThread(target=fn),
175       ]
176
177     # Wait for threads to be waiting
178     for _ in threads:
179       self.assertEqual(self.done.get(True, 1), "A")
180
181     self.assertRaises(Queue.Empty, self.done.get_nowait)
182
183     self.cond.acquire()
184     self.assertEqual(len(self.cond._waiters), 3)
185     self.assertEqual(self.cond._waiters, set(threads))
186     # This new thread can't acquire the lock, and thus call wait, before we
187     # release it
188     self._addThread(target=fn)
189     self.cond.notifyAll()
190     self.assertRaises(Queue.Empty, self.done.get_nowait)
191     self.cond.release()
192
193     # We should now get 3 W and 1 A (for the new thread) in whatever order
194     w = 0
195     a = 0
196     for i in range(4):
197       got = self.done.get(True, 1)
198       if got == "W":
199         w += 1
200       elif got == "A":
201         a += 1
202       else:
203         self.fail("Got %s on the done queue" % got)
204
205     self.assertEqual(w, 3)
206     self.assertEqual(a, 1)
207
208     self.cond.acquire()
209     self.cond.notifyAll()
210     self.cond.release()
211     self._waitThreads()
212     self.assertEqual(self.done.get_nowait(), "W")
213     self.assertRaises(Queue.Empty, self.done.get_nowait)
214
215   def testBlockingWait(self):
216     def _BlockingWait():
217       self.cond.acquire()
218       self.done.put("A")
219       self.cond.wait()
220       self.cond.release()
221       self.done.put("W")
222
223     self._TestWait(_BlockingWait)
224
225   def testLongTimeoutWait(self):
226     def _Helper():
227       self.cond.acquire()
228       self.done.put("A")
229       self.cond.wait(15.0)
230       self.cond.release()
231       self.done.put("W")
232
233     self._TestWait(_Helper)
234
235   def _TimeoutWait(self, timeout, check):
236     self.cond.acquire()
237     self.cond.wait(timeout)
238     self.cond.release()
239     self.done.put(check)
240
241   def testShortTimeoutWait(self):
242     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
243     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
244     self._waitThreads()
245     self.assertEqual(self.done.get_nowait(), "T1")
246     self.assertEqual(self.done.get_nowait(), "T1")
247     self.assertRaises(Queue.Empty, self.done.get_nowait)
248
249   def testZeroTimeoutWait(self):
250     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
251     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
252     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
253     self._waitThreads()
254     self.assertEqual(self.done.get_nowait(), "T0")
255     self.assertEqual(self.done.get_nowait(), "T0")
256     self.assertEqual(self.done.get_nowait(), "T0")
257     self.assertRaises(Queue.Empty, self.done.get_nowait)
258
259
260 class TestSharedLock(_ThreadedTestCase):
261   """SharedLock tests"""
262
263   def setUp(self):
264     _ThreadedTestCase.setUp(self)
265     self.sl = locking.SharedLock("TestSharedLock")
266
267   def testSequenceAndOwnership(self):
268     self.assertFalse(self.sl._is_owned())
269     self.sl.acquire(shared=1)
270     self.assert_(self.sl._is_owned())
271     self.assert_(self.sl._is_owned(shared=1))
272     self.assertFalse(self.sl._is_owned(shared=0))
273     self.sl.release()
274     self.assertFalse(self.sl._is_owned())
275     self.sl.acquire()
276     self.assert_(self.sl._is_owned())
277     self.assertFalse(self.sl._is_owned(shared=1))
278     self.assert_(self.sl._is_owned(shared=0))
279     self.sl.release()
280     self.assertFalse(self.sl._is_owned())
281     self.sl.acquire(shared=1)
282     self.assert_(self.sl._is_owned())
283     self.assert_(self.sl._is_owned(shared=1))
284     self.assertFalse(self.sl._is_owned(shared=0))
285     self.sl.release()
286     self.assertFalse(self.sl._is_owned())
287
288   def testBooleanValue(self):
289     # semaphores are supposed to return a true value on a successful acquire
290     self.assert_(self.sl.acquire(shared=1))
291     self.sl.release()
292     self.assert_(self.sl.acquire())
293     self.sl.release()
294
295   def testDoubleLockingStoE(self):
296     self.sl.acquire(shared=1)
297     self.assertRaises(AssertionError, self.sl.acquire)
298
299   def testDoubleLockingEtoS(self):
300     self.sl.acquire()
301     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
302
303   def testDoubleLockingStoS(self):
304     self.sl.acquire(shared=1)
305     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306
307   def testDoubleLockingEtoE(self):
308     self.sl.acquire()
309     self.assertRaises(AssertionError, self.sl.acquire)
310
311   # helper functions: called in a separate thread they acquire the lock, send
312   # their identifier on the done queue, then release it.
313   def _doItSharer(self):
314     try:
315       self.sl.acquire(shared=1)
316       self.done.put('SHR')
317       self.sl.release()
318     except errors.LockError:
319       self.done.put('ERR')
320
321   def _doItExclusive(self):
322     try:
323       self.sl.acquire()
324       self.done.put('EXC')
325       self.sl.release()
326     except errors.LockError:
327       self.done.put('ERR')
328
329   def _doItDelete(self):
330     try:
331       self.sl.delete()
332       self.done.put('DEL')
333     except errors.LockError:
334       self.done.put('ERR')
335
336   def testSharersCanCoexist(self):
337     self.sl.acquire(shared=1)
338     threading.Thread(target=self._doItSharer).start()
339     self.assert_(self.done.get(True, 1))
340     self.sl.release()
341
342   @_Repeat
343   def testExclusiveBlocksExclusive(self):
344     self.sl.acquire()
345     self._addThread(target=self._doItExclusive)
346     self.assertRaises(Queue.Empty, self.done.get_nowait)
347     self.sl.release()
348     self._waitThreads()
349     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
350
351   @_Repeat
352   def testExclusiveBlocksDelete(self):
353     self.sl.acquire()
354     self._addThread(target=self._doItDelete)
355     self.assertRaises(Queue.Empty, self.done.get_nowait)
356     self.sl.release()
357     self._waitThreads()
358     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
359     self.sl = locking.SharedLock(self.sl.name)
360
361   @_Repeat
362   def testExclusiveBlocksSharer(self):
363     self.sl.acquire()
364     self._addThread(target=self._doItSharer)
365     self.assertRaises(Queue.Empty, self.done.get_nowait)
366     self.sl.release()
367     self._waitThreads()
368     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
369
370   @_Repeat
371   def testSharerBlocksExclusive(self):
372     self.sl.acquire(shared=1)
373     self._addThread(target=self._doItExclusive)
374     self.assertRaises(Queue.Empty, self.done.get_nowait)
375     self.sl.release()
376     self._waitThreads()
377     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
378
379   @_Repeat
380   def testSharerBlocksDelete(self):
381     self.sl.acquire(shared=1)
382     self._addThread(target=self._doItDelete)
383     self.assertRaises(Queue.Empty, self.done.get_nowait)
384     self.sl.release()
385     self._waitThreads()
386     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
387     self.sl = locking.SharedLock(self.sl.name)
388
389   @_Repeat
390   def testWaitingExclusiveBlocksSharer(self):
391     """SKIPPED testWaitingExclusiveBlockSharer"""
392     return
393
394     self.sl.acquire(shared=1)
395     # the lock is acquired in shared mode...
396     self._addThread(target=self._doItExclusive)
397     # ...but now an exclusive is waiting...
398     self._addThread(target=self._doItSharer)
399     # ...so the sharer should be blocked as well
400     self.assertRaises(Queue.Empty, self.done.get_nowait)
401     self.sl.release()
402     self._waitThreads()
403     # The exclusive passed before
404     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
405     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
406
407   @_Repeat
408   def testWaitingSharerBlocksExclusive(self):
409     """SKIPPED testWaitingSharerBlocksExclusive"""
410     return
411
412     self.sl.acquire()
413     # the lock is acquired in exclusive mode...
414     self._addThread(target=self._doItSharer)
415     # ...but now a sharer is waiting...
416     self._addThread(target=self._doItExclusive)
417     # ...the exclusive is waiting too...
418     self.assertRaises(Queue.Empty, self.done.get_nowait)
419     self.sl.release()
420     self._waitThreads()
421     # The sharer passed before
422     self.assertEqual(self.done.get_nowait(), 'SHR')
423     self.assertEqual(self.done.get_nowait(), 'EXC')
424
425   def testDelete(self):
426     self.sl.delete()
427     self.assertRaises(errors.LockError, self.sl.acquire)
428     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
429     self.assertRaises(errors.LockError, self.sl.delete)
430
431   def testDeleteTimeout(self):
432     self.sl.delete(timeout=60)
433
434   def testNoDeleteIfSharer(self):
435     self.sl.acquire(shared=1)
436     self.assertRaises(AssertionError, self.sl.delete)
437
438   @_Repeat
439   def testDeletePendingSharersExclusiveDelete(self):
440     self.sl.acquire()
441     self._addThread(target=self._doItSharer)
442     self._addThread(target=self._doItSharer)
443     self._addThread(target=self._doItExclusive)
444     self._addThread(target=self._doItDelete)
445     self.sl.delete()
446     self._waitThreads()
447     # The threads who were pending return ERR
448     for _ in range(4):
449       self.assertEqual(self.done.get_nowait(), 'ERR')
450     self.sl = locking.SharedLock(self.sl.name)
451
452   @_Repeat
453   def testDeletePendingDeleteExclusiveSharers(self):
454     self.sl.acquire()
455     self._addThread(target=self._doItDelete)
456     self._addThread(target=self._doItExclusive)
457     self._addThread(target=self._doItSharer)
458     self._addThread(target=self._doItSharer)
459     self.sl.delete()
460     self._waitThreads()
461     # The two threads who were pending return both ERR
462     self.assertEqual(self.done.get_nowait(), 'ERR')
463     self.assertEqual(self.done.get_nowait(), 'ERR')
464     self.assertEqual(self.done.get_nowait(), 'ERR')
465     self.assertEqual(self.done.get_nowait(), 'ERR')
466     self.sl = locking.SharedLock(self.sl.name)
467
468   @_Repeat
469   def testExclusiveAcquireTimeout(self):
470     for shared in [0, 1]:
471       on_queue = threading.Event()
472       release_exclusive = threading.Event()
473
474       def _LockExclusive():
475         self.sl.acquire(shared=0, test_notify=on_queue.set)
476         self.done.put("A: start wait")
477         release_exclusive.wait()
478         self.done.put("A: end wait")
479         self.sl.release()
480
481       # Start thread to hold lock in exclusive mode
482       self._addThread(target=_LockExclusive)
483
484       # Wait for wait to begin
485       self.assertEqual(self.done.get(timeout=60), "A: start wait")
486
487       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
488       # on the queue
489       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
490                                       test_notify=release_exclusive.set))
491
492       self.done.put("got 2nd")
493       self.sl.release()
494
495       self._waitThreads()
496
497       self.assertEqual(self.done.get_nowait(), "A: end wait")
498       self.assertEqual(self.done.get_nowait(), "got 2nd")
499       self.assertRaises(Queue.Empty, self.done.get_nowait)
500
501   @_Repeat
502   def testAcquireExpiringTimeout(self):
503     def _AcquireWithTimeout(shared, timeout):
504       if not self.sl.acquire(shared=shared, timeout=timeout):
505         self.done.put("timeout")
506
507     for shared in [0, 1]:
508       # Lock exclusively
509       self.sl.acquire()
510
511       # Start shared acquires with timeout between 0 and 20 ms
512       for i in range(11):
513         self._addThread(target=_AcquireWithTimeout,
514                         args=(shared, i * 2.0 / 1000.0))
515
516       # Wait for threads to finish (makes sure the acquire timeout expires
517       # before releasing the lock)
518       self._waitThreads()
519
520       # Release lock
521       self.sl.release()
522
523       for _ in range(11):
524         self.assertEqual(self.done.get_nowait(), "timeout")
525
526       self.assertRaises(Queue.Empty, self.done.get_nowait)
527
528   @_Repeat
529   def testSharedSkipExclusiveAcquires(self):
530     # Tests whether shared acquires jump in front of exclusive acquires in the
531     # queue.
532
533     def _Acquire(shared, name, notify_ev, wait_ev):
534       if notify_ev:
535         notify_fn = notify_ev.set
536       else:
537         notify_fn = None
538
539       if wait_ev:
540         wait_ev.wait()
541
542       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
543         return
544
545       self.done.put(name)
546       self.sl.release()
547
548     # Get exclusive lock while we fill the queue
549     self.sl.acquire()
550
551     shrcnt1 = 5
552     shrcnt2 = 7
553     shrcnt3 = 9
554     shrcnt4 = 2
555
556     # Add acquires using threading.Event for synchronization. They'll be
557     # acquired exactly in the order defined in this list.
558     acquires = (shrcnt1 * [(1, "shared 1")] +
559                 3 * [(0, "exclusive 1")] +
560                 shrcnt2 * [(1, "shared 2")] +
561                 shrcnt3 * [(1, "shared 3")] +
562                 shrcnt4 * [(1, "shared 4")] +
563                 3 * [(0, "exclusive 2")])
564
565     ev_cur = None
566     ev_prev = None
567
568     for args in acquires:
569       ev_cur = threading.Event()
570       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
571       ev_prev = ev_cur
572
573     # Wait for last acquire to start
574     ev_prev.wait()
575
576     # Expect 6 pending exclusive acquires and 1 for all shared acquires
577     # together
578     self.assertEqual(self.sl._count_pending(), 7)
579
580     # Release exclusive lock and wait
581     self.sl.release()
582
583     self._waitThreads()
584
585     # Check sequence
586     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
587       # Shared locks aren't guaranteed to be notified in order, but they'll be
588       # first
589       tmp = self.done.get_nowait()
590       if tmp == "shared 1":
591         shrcnt1 -= 1
592       elif tmp == "shared 2":
593         shrcnt2 -= 1
594       elif tmp == "shared 3":
595         shrcnt3 -= 1
596       elif tmp == "shared 4":
597         shrcnt4 -= 1
598     self.assertEqual(shrcnt1, 0)
599     self.assertEqual(shrcnt2, 0)
600     self.assertEqual(shrcnt3, 0)
601     self.assertEqual(shrcnt3, 0)
602
603     for _ in range(3):
604       self.assertEqual(self.done.get_nowait(), "exclusive 1")
605
606     for _ in range(3):
607       self.assertEqual(self.done.get_nowait(), "exclusive 2")
608
609     self.assertRaises(Queue.Empty, self.done.get_nowait)
610
611   @_Repeat
612   def testMixedAcquireTimeout(self):
613     sync = threading.Event()
614
615     def _AcquireShared(ev):
616       if not self.sl.acquire(shared=1, timeout=None):
617         return
618
619       self.done.put("shared")
620
621       # Notify main thread
622       ev.set()
623
624       # Wait for notification from main thread
625       sync.wait()
626
627       # Release lock
628       self.sl.release()
629
630     acquires = []
631     for _ in range(3):
632       ev = threading.Event()
633       self._addThread(target=_AcquireShared, args=(ev, ))
634       acquires.append(ev)
635
636     # Wait for all acquires to finish
637     for i in acquires:
638       i.wait()
639
640     self.assertEqual(self.sl._count_pending(), 0)
641
642     # Try to get exclusive lock
643     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
644
645     # Acquire exclusive without timeout
646     exclsync = threading.Event()
647     exclev = threading.Event()
648
649     def _AcquireExclusive():
650       if not self.sl.acquire(shared=0):
651         return
652
653       self.done.put("exclusive")
654
655       # Notify main thread
656       exclev.set()
657
658       # Wait for notification from main thread
659       exclsync.wait()
660
661       self.sl.release()
662
663     self._addThread(target=_AcquireExclusive)
664
665     # Try to get exclusive lock
666     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
667
668     # Make all shared holders release their locks
669     sync.set()
670
671     # Wait for exclusive acquire to succeed
672     exclev.wait()
673
674     self.assertEqual(self.sl._count_pending(), 0)
675
676     # Try to get exclusive lock
677     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
678
679     def _AcquireSharedSimple():
680       if self.sl.acquire(shared=1, timeout=None):
681         self.done.put("shared2")
682         self.sl.release()
683
684     for _ in range(10):
685       self._addThread(target=_AcquireSharedSimple)
686
687     # Tell exclusive lock to release
688     exclsync.set()
689
690     # Wait for everything to finish
691     self._waitThreads()
692
693     self.assertEqual(self.sl._count_pending(), 0)
694
695     # Check sequence
696     for _ in range(3):
697       self.assertEqual(self.done.get_nowait(), "shared")
698
699     self.assertEqual(self.done.get_nowait(), "exclusive")
700
701     for _ in range(10):
702       self.assertEqual(self.done.get_nowait(), "shared2")
703
704     self.assertRaises(Queue.Empty, self.done.get_nowait)
705
706   def testPriority(self):
707     # Acquire in exclusive mode
708     self.assert_(self.sl.acquire(shared=0))
709
710     # Queue acquires
711     def _Acquire(prev, next, shared, priority, result):
712       prev.wait()
713       self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
714       try:
715         self.done.put(result)
716       finally:
717         self.sl.release()
718
719     counter = itertools.count(0)
720     priorities = range(-20, 30)
721     first = threading.Event()
722     prev = first
723
724     # Data structure:
725     # {
726     #   priority:
727     #     [(shared/exclusive, set(acquire names), set(pending threads)),
728     #      (shared/exclusive, ...),
729     #      ...,
730     #     ],
731     # }
732     perprio = {}
733
734     # References shared acquire per priority in L{perprio}. Data structure:
735     # {
736     #   priority: (shared=1, set(acquire names), set(pending threads)),
737     # }
738     prioshared = {}
739
740     for seed in [4979, 9523, 14902, 32440]:
741       # Use a deterministic random generator
742       rnd = random.Random(seed)
743       for priority in [rnd.choice(priorities) for _ in range(30)]:
744         modes = [0, 1]
745         rnd.shuffle(modes)
746         for shared in modes:
747           # Unique name
748           acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
749
750           ev = threading.Event()
751           thread = self._addThread(target=_Acquire,
752                                    args=(prev, ev, shared, priority, acqname))
753           prev = ev
754
755           # Record expected aqcuire, see above for structure
756           data = (shared, set([acqname]), set([thread]))
757           priolist = perprio.setdefault(priority, [])
758           if shared:
759             priosh = prioshared.get(priority, None)
760             if priosh:
761               # Shared acquires are merged
762               for i, j in zip(priosh[1:], data[1:]):
763                 i.update(j)
764               assert data[0] == priosh[0]
765             else:
766               prioshared[priority] = data
767               priolist.append(data)
768           else:
769             priolist.append(data)
770
771     # Start all acquires and wait for them
772     first.set()
773     prev.wait()
774
775     # Check lock information
776     self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
777     self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
778                      ["exclusive", [threading.currentThread().getName()]])
779     self.assertEqual(self.sl.GetInfo(["name", "pending"]),
780                      [self.sl.name,
781                       [(["exclusive", "shared"][int(bool(shared))],
782                         sorted([t.getName() for t in threads]))
783                        for acquires in [perprio[i]
784                                         for i in sorted(perprio.keys())]
785                        for (shared, _, threads) in acquires]])
786
787     # Let threads acquire the lock
788     self.sl.release()
789
790     # Wait for everything to finish
791     self._waitThreads()
792
793     self.assert_(self.sl._check_empty())
794
795     # Check acquires by priority
796     for acquires in [perprio[i] for i in sorted(perprio.keys())]:
797       for (_, names, _) in acquires:
798         # For shared acquires, the set will contain 1..n entries. For exclusive
799         # acquires only one.
800         while names:
801           names.remove(self.done.get_nowait())
802       self.assertFalse(compat.any(names for (_, names, _) in acquires))
803
804     self.assertRaises(Queue.Empty, self.done.get_nowait)
805
806
807 class TestSharedLockInCondition(_ThreadedTestCase):
808   """SharedLock as a condition lock tests"""
809
810   def setUp(self):
811     _ThreadedTestCase.setUp(self)
812     self.sl = locking.SharedLock("TestSharedLockInCondition")
813     self.setCondition()
814
815   def setCondition(self):
816     self.cond = threading.Condition(self.sl)
817
818   def testKeepMode(self):
819     self.cond.acquire(shared=1)
820     self.assert_(self.sl._is_owned(shared=1))
821     self.cond.wait(0)
822     self.assert_(self.sl._is_owned(shared=1))
823     self.cond.release()
824     self.cond.acquire(shared=0)
825     self.assert_(self.sl._is_owned(shared=0))
826     self.cond.wait(0)
827     self.assert_(self.sl._is_owned(shared=0))
828     self.cond.release()
829
830
831 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
832   """SharedLock as a pipe condition lock tests"""
833
834   def setCondition(self):
835     self.cond = locking.PipeCondition(self.sl)
836
837
838 class TestSSynchronizedDecorator(_ThreadedTestCase):
839   """Shared Lock Synchronized decorator test"""
840
841   def setUp(self):
842     _ThreadedTestCase.setUp(self)
843
844   @locking.ssynchronized(_decoratorlock)
845   def _doItExclusive(self):
846     self.assert_(_decoratorlock._is_owned())
847     self.done.put('EXC')
848
849   @locking.ssynchronized(_decoratorlock, shared=1)
850   def _doItSharer(self):
851     self.assert_(_decoratorlock._is_owned(shared=1))
852     self.done.put('SHR')
853
854   def testDecoratedFunctions(self):
855     self._doItExclusive()
856     self.assertFalse(_decoratorlock._is_owned())
857     self._doItSharer()
858     self.assertFalse(_decoratorlock._is_owned())
859
860   def testSharersCanCoexist(self):
861     _decoratorlock.acquire(shared=1)
862     threading.Thread(target=self._doItSharer).start()
863     self.assert_(self.done.get(True, 1))
864     _decoratorlock.release()
865
866   @_Repeat
867   def testExclusiveBlocksExclusive(self):
868     _decoratorlock.acquire()
869     self._addThread(target=self._doItExclusive)
870     # give it a bit of time to check that it's not actually doing anything
871     self.assertRaises(Queue.Empty, self.done.get_nowait)
872     _decoratorlock.release()
873     self._waitThreads()
874     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
875
876   @_Repeat
877   def testExclusiveBlocksSharer(self):
878     _decoratorlock.acquire()
879     self._addThread(target=self._doItSharer)
880     self.assertRaises(Queue.Empty, self.done.get_nowait)
881     _decoratorlock.release()
882     self._waitThreads()
883     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
884
885   @_Repeat
886   def testSharerBlocksExclusive(self):
887     _decoratorlock.acquire(shared=1)
888     self._addThread(target=self._doItExclusive)
889     self.assertRaises(Queue.Empty, self.done.get_nowait)
890     _decoratorlock.release()
891     self._waitThreads()
892     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
893
894
895 class TestLockSet(_ThreadedTestCase):
896   """LockSet tests"""
897
898   def setUp(self):
899     _ThreadedTestCase.setUp(self)
900     self._setUpLS()
901
902   def _setUpLS(self):
903     """Helper to (re)initialize the lock set"""
904     self.resources = ['one', 'two', 'three']
905     self.ls = locking.LockSet(self.resources, "TestLockSet")
906
907   def testResources(self):
908     self.assertEquals(self.ls._names(), set(self.resources))
909     newls = locking.LockSet([], "TestLockSet.testResources")
910     self.assertEquals(newls._names(), set())
911
912   def testAcquireRelease(self):
913     self.assert_(self.ls.acquire('one'))
914     self.assertEquals(self.ls._list_owned(), set(['one']))
915     self.ls.release()
916     self.assertEquals(self.ls._list_owned(), set())
917     self.assertEquals(self.ls.acquire(['one']), set(['one']))
918     self.assertEquals(self.ls._list_owned(), set(['one']))
919     self.ls.release()
920     self.assertEquals(self.ls._list_owned(), set())
921     self.ls.acquire(['one', 'two', 'three'])
922     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
923     self.ls.release('one')
924     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
925     self.ls.release(['three'])
926     self.assertEquals(self.ls._list_owned(), set(['two']))
927     self.ls.release()
928     self.assertEquals(self.ls._list_owned(), set())
929     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
930     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
931     self.ls.release()
932     self.assertEquals(self.ls._list_owned(), set())
933
934   def testNoDoubleAcquire(self):
935     self.ls.acquire('one')
936     self.assertRaises(AssertionError, self.ls.acquire, 'one')
937     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
938     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
939     self.ls.release()
940     self.ls.acquire(['one', 'three'])
941     self.ls.release('one')
942     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
943     self.ls.release('three')
944
945   def testNoWrongRelease(self):
946     self.assertRaises(AssertionError, self.ls.release)
947     self.ls.acquire('one')
948     self.assertRaises(AssertionError, self.ls.release, 'two')
949
950   def testAddRemove(self):
951     self.ls.add('four')
952     self.assertEquals(self.ls._list_owned(), set())
953     self.assert_('four' in self.ls._names())
954     self.ls.add(['five', 'six', 'seven'], acquired=1)
955     self.assert_('five' in self.ls._names())
956     self.assert_('six' in self.ls._names())
957     self.assert_('seven' in self.ls._names())
958     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
959     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
960     self.assert_('five' not in self.ls._names())
961     self.assert_('six' not in self.ls._names())
962     self.assertEquals(self.ls._list_owned(), set(['seven']))
963     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
964     self.ls.remove('seven')
965     self.assert_('seven' not in self.ls._names())
966     self.assertEquals(self.ls._list_owned(), set([]))
967     self.ls.acquire(None, shared=1)
968     self.assertRaises(AssertionError, self.ls.add, 'eight')
969     self.ls.release()
970     self.ls.acquire(None)
971     self.ls.add('eight', acquired=1)
972     self.assert_('eight' in self.ls._names())
973     self.assert_('eight' in self.ls._list_owned())
974     self.ls.add('nine')
975     self.assert_('nine' in self.ls._names())
976     self.assert_('nine' not in self.ls._list_owned())
977     self.ls.release()
978     self.ls.remove(['two'])
979     self.assert_('two' not in self.ls._names())
980     self.ls.acquire('three')
981     self.assertEquals(self.ls.remove(['three']), ['three'])
982     self.assert_('three' not in self.ls._names())
983     self.assertEquals(self.ls.remove('three'), [])
984     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
985     self.assert_('one' not in self.ls._names())
986
987   def testRemoveNonBlocking(self):
988     self.ls.acquire('one')
989     self.assertEquals(self.ls.remove('one'), ['one'])
990     self.ls.acquire(['two', 'three'])
991     self.assertEquals(self.ls.remove(['two', 'three']),
992                       ['two', 'three'])
993
994   def testNoDoubleAdd(self):
995     self.assertRaises(errors.LockError, self.ls.add, 'two')
996     self.ls.add('four')
997     self.assertRaises(errors.LockError, self.ls.add, 'four')
998
999   def testNoWrongRemoves(self):
1000     self.ls.acquire(['one', 'three'], shared=1)
1001     # Cannot remove 'two' while holding something which is not a superset
1002     self.assertRaises(AssertionError, self.ls.remove, 'two')
1003     # Cannot remove 'three' as we are sharing it
1004     self.assertRaises(AssertionError, self.ls.remove, 'three')
1005
1006   def testAcquireSetLock(self):
1007     # acquire the set-lock exclusively
1008     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1009     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1010     self.assertEquals(self.ls._is_owned(), True)
1011     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1012     # I can still add/remove elements...
1013     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1014     self.assert_(self.ls.add('six'))
1015     self.ls.release()
1016     # share the set-lock
1017     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1018     # adding new elements is not possible
1019     self.assertRaises(AssertionError, self.ls.add, 'five')
1020     self.ls.release()
1021
1022   def testAcquireWithRepetitions(self):
1023     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1024                       set(['two', 'two', 'three']))
1025     self.ls.release(['two', 'two'])
1026     self.assertEquals(self.ls._list_owned(), set(['three']))
1027
1028   def testEmptyAcquire(self):
1029     # Acquire an empty list of locks...
1030     self.assertEquals(self.ls.acquire([]), set())
1031     self.assertEquals(self.ls._list_owned(), set())
1032     # New locks can still be addded
1033     self.assert_(self.ls.add('six'))
1034     # "re-acquiring" is not an issue, since we had really acquired nothing
1035     self.assertEquals(self.ls.acquire([], shared=1), set())
1036     self.assertEquals(self.ls._list_owned(), set())
1037     # We haven't really acquired anything, so we cannot release
1038     self.assertRaises(AssertionError, self.ls.release)
1039
1040   def _doLockSet(self, names, shared):
1041     try:
1042       self.ls.acquire(names, shared=shared)
1043       self.done.put('DONE')
1044       self.ls.release()
1045     except errors.LockError:
1046       self.done.put('ERR')
1047
1048   def _doAddSet(self, names):
1049     try:
1050       self.ls.add(names, acquired=1)
1051       self.done.put('DONE')
1052       self.ls.release()
1053     except errors.LockError:
1054       self.done.put('ERR')
1055
1056   def _doRemoveSet(self, names):
1057     self.done.put(self.ls.remove(names))
1058
1059   @_Repeat
1060   def testConcurrentSharedAcquire(self):
1061     self.ls.acquire(['one', 'two'], shared=1)
1062     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1063     self._waitThreads()
1064     self.assertEqual(self.done.get_nowait(), 'DONE')
1065     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1066     self._waitThreads()
1067     self.assertEqual(self.done.get_nowait(), 'DONE')
1068     self._addThread(target=self._doLockSet, args=('three', 1))
1069     self._waitThreads()
1070     self.assertEqual(self.done.get_nowait(), 'DONE')
1071     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1072     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1073     self.assertRaises(Queue.Empty, self.done.get_nowait)
1074     self.ls.release()
1075     self._waitThreads()
1076     self.assertEqual(self.done.get_nowait(), 'DONE')
1077     self.assertEqual(self.done.get_nowait(), 'DONE')
1078
1079   @_Repeat
1080   def testConcurrentExclusiveAcquire(self):
1081     self.ls.acquire(['one', 'two'])
1082     self._addThread(target=self._doLockSet, args=('three', 1))
1083     self._waitThreads()
1084     self.assertEqual(self.done.get_nowait(), 'DONE')
1085     self._addThread(target=self._doLockSet, args=('three', 0))
1086     self._waitThreads()
1087     self.assertEqual(self.done.get_nowait(), 'DONE')
1088     self.assertRaises(Queue.Empty, self.done.get_nowait)
1089     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1090     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1091     self._addThread(target=self._doLockSet, args=('one', 0))
1092     self._addThread(target=self._doLockSet, args=('one', 1))
1093     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1094     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1095     self.assertRaises(Queue.Empty, self.done.get_nowait)
1096     self.ls.release()
1097     self._waitThreads()
1098     for _ in range(6):
1099       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1100
1101   @_Repeat
1102   def testSimpleAcquireTimeoutExpiring(self):
1103     names = sorted(self.ls._names())
1104     self.assert_(len(names) >= 3)
1105
1106     # Get name of first lock
1107     first = names[0]
1108
1109     # Get name of last lock
1110     last = names.pop()
1111
1112     checks = [
1113       # Block first and try to lock it again
1114       (first, first),
1115
1116       # Block last and try to lock all locks
1117       (None, first),
1118
1119       # Block last and try to lock it again
1120       (last, last),
1121       ]
1122
1123     for (wanted, block) in checks:
1124       # Lock in exclusive mode
1125       self.assert_(self.ls.acquire(block, shared=0))
1126
1127       def _AcquireOne():
1128         # Try to get the same lock again with a timeout (should never succeed)
1129         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1130         if acquired:
1131           self.done.put("acquired")
1132           self.ls.release()
1133         else:
1134           self.assert_(acquired is None)
1135           self.assertFalse(self.ls._list_owned())
1136           self.assertFalse(self.ls._is_owned())
1137           self.done.put("not acquired")
1138
1139       self._addThread(target=_AcquireOne)
1140
1141       # Wait for timeout in thread to expire
1142       self._waitThreads()
1143
1144       # Release exclusive lock again
1145       self.ls.release()
1146
1147       self.assertEqual(self.done.get_nowait(), "not acquired")
1148       self.assertRaises(Queue.Empty, self.done.get_nowait)
1149
1150   @_Repeat
1151   def testDelayedAndExpiringLockAcquire(self):
1152     self._setUpLS()
1153     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1154
1155     for expire in (False, True):
1156       names = sorted(self.ls._names())
1157       self.assertEqual(len(names), 8)
1158
1159       lock_ev = dict([(i, threading.Event()) for i in names])
1160
1161       # Lock all in exclusive mode
1162       self.assert_(self.ls.acquire(names, shared=0))
1163
1164       if expire:
1165         # We'll wait at least 300ms per lock
1166         lockwait = len(names) * [0.3]
1167
1168         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1169         # this gives us up to 2.4s to fail.
1170         lockall_timeout = 0.4
1171       else:
1172         # This should finish rather quickly
1173         lockwait = None
1174         lockall_timeout = len(names) * 5.0
1175
1176       def _LockAll():
1177         def acquire_notification(name):
1178           if not expire:
1179             self.done.put("getting %s" % name)
1180
1181           # Kick next lock
1182           lock_ev[name].set()
1183
1184         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1185                            test_notify=acquire_notification):
1186           self.done.put("got all")
1187           self.ls.release()
1188         else:
1189           self.done.put("timeout on all")
1190
1191         # Notify all locks
1192         for ev in lock_ev.values():
1193           ev.set()
1194
1195       t = self._addThread(target=_LockAll)
1196
1197       for idx, name in enumerate(names):
1198         # Wait for actual acquire on this lock to start
1199         lock_ev[name].wait(10.0)
1200
1201         if expire and t.isAlive():
1202           # Wait some time after getting the notification to make sure the lock
1203           # acquire will expire
1204           SafeSleep(lockwait[idx])
1205
1206         self.ls.release(names=name)
1207
1208       self.assertFalse(self.ls._list_owned())
1209
1210       self._waitThreads()
1211
1212       if expire:
1213         # Not checking which locks were actually acquired. Doing so would be
1214         # too timing-dependant.
1215         self.assertEqual(self.done.get_nowait(), "timeout on all")
1216       else:
1217         for i in names:
1218           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1219         self.assertEqual(self.done.get_nowait(), "got all")
1220       self.assertRaises(Queue.Empty, self.done.get_nowait)
1221
1222   @_Repeat
1223   def testConcurrentRemove(self):
1224     self.ls.add('four')
1225     self.ls.acquire(['one', 'two', 'four'])
1226     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1227     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1228     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1229     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1230     self.assertRaises(Queue.Empty, self.done.get_nowait)
1231     self.ls.remove('one')
1232     self.ls.release()
1233     self._waitThreads()
1234     for i in range(4):
1235       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1236     self.ls.add(['five', 'six'], acquired=1)
1237     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1238     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1239     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1240     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1241     self.ls.remove('five')
1242     self.ls.release()
1243     self._waitThreads()
1244     for i in range(4):
1245       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1246     self.ls.acquire(['three', 'four'])
1247     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1248     self.assertRaises(Queue.Empty, self.done.get_nowait)
1249     self.ls.remove('four')
1250     self._waitThreads()
1251     self.assertEqual(self.done.get_nowait(), ['six'])
1252     self._addThread(target=self._doRemoveSet, args=(['two']))
1253     self._waitThreads()
1254     self.assertEqual(self.done.get_nowait(), ['two'])
1255     self.ls.release()
1256     # reset lockset
1257     self._setUpLS()
1258
1259   @_Repeat
1260   def testConcurrentSharedSetLock(self):
1261     # share the set-lock...
1262     self.ls.acquire(None, shared=1)
1263     # ...another thread can share it too
1264     self._addThread(target=self._doLockSet, args=(None, 1))
1265     self._waitThreads()
1266     self.assertEqual(self.done.get_nowait(), 'DONE')
1267     # ...or just share some elements
1268     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1269     self._waitThreads()
1270     self.assertEqual(self.done.get_nowait(), 'DONE')
1271     # ...but not add new ones or remove any
1272     t = self._addThread(target=self._doAddSet, args=(['nine']))
1273     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1274     self.assertRaises(Queue.Empty, self.done.get_nowait)
1275     # this just releases the set-lock
1276     self.ls.release([])
1277     t.join(60)
1278     self.assertEqual(self.done.get_nowait(), 'DONE')
1279     # release the lock on the actual elements so remove() can proceed too
1280     self.ls.release()
1281     self._waitThreads()
1282     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1283     # reset lockset
1284     self._setUpLS()
1285
1286   @_Repeat
1287   def testConcurrentExclusiveSetLock(self):
1288     # acquire the set-lock...
1289     self.ls.acquire(None, shared=0)
1290     # ...no one can do anything else
1291     self._addThread(target=self._doLockSet, args=(None, 1))
1292     self._addThread(target=self._doLockSet, args=(None, 0))
1293     self._addThread(target=self._doLockSet, args=(['three'], 0))
1294     self._addThread(target=self._doLockSet, args=(['two'], 1))
1295     self._addThread(target=self._doAddSet, args=(['nine']))
1296     self.assertRaises(Queue.Empty, self.done.get_nowait)
1297     self.ls.release()
1298     self._waitThreads()
1299     for _ in range(5):
1300       self.assertEqual(self.done.get(True, 1), 'DONE')
1301     # cleanup
1302     self._setUpLS()
1303
1304   @_Repeat
1305   def testConcurrentSetLockAdd(self):
1306     self.ls.acquire('one')
1307     # Another thread wants the whole SetLock
1308     self._addThread(target=self._doLockSet, args=(None, 0))
1309     self._addThread(target=self._doLockSet, args=(None, 1))
1310     self.assertRaises(Queue.Empty, self.done.get_nowait)
1311     self.assertRaises(AssertionError, self.ls.add, 'four')
1312     self.ls.release()
1313     self._waitThreads()
1314     self.assertEqual(self.done.get_nowait(), 'DONE')
1315     self.assertEqual(self.done.get_nowait(), 'DONE')
1316     self.ls.acquire(None)
1317     self._addThread(target=self._doLockSet, args=(None, 0))
1318     self._addThread(target=self._doLockSet, args=(None, 1))
1319     self.assertRaises(Queue.Empty, self.done.get_nowait)
1320     self.ls.add('four')
1321     self.ls.add('five', acquired=1)
1322     self.ls.add('six', acquired=1, shared=1)
1323     self.assertEquals(self.ls._list_owned(),
1324       set(['one', 'two', 'three', 'five', 'six']))
1325     self.assertEquals(self.ls._is_owned(), True)
1326     self.assertEquals(self.ls._names(),
1327       set(['one', 'two', 'three', 'four', 'five', 'six']))
1328     self.ls.release()
1329     self._waitThreads()
1330     self.assertEqual(self.done.get_nowait(), 'DONE')
1331     self.assertEqual(self.done.get_nowait(), 'DONE')
1332     self._setUpLS()
1333
1334   @_Repeat
1335   def testEmptyLockSet(self):
1336     # get the set-lock
1337     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1338     # now empty it...
1339     self.ls.remove(['one', 'two', 'three'])
1340     # and adds/locks by another thread still wait
1341     self._addThread(target=self._doAddSet, args=(['nine']))
1342     self._addThread(target=self._doLockSet, args=(None, 1))
1343     self._addThread(target=self._doLockSet, args=(None, 0))
1344     self.assertRaises(Queue.Empty, self.done.get_nowait)
1345     self.ls.release()
1346     self._waitThreads()
1347     for _ in range(3):
1348       self.assertEqual(self.done.get_nowait(), 'DONE')
1349     # empty it again...
1350     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1351     # now share it...
1352     self.assertEqual(self.ls.acquire(None, shared=1), set())
1353     # other sharers can go, adds still wait
1354     self._addThread(target=self._doLockSet, args=(None, 1))
1355     self._waitThreads()
1356     self.assertEqual(self.done.get_nowait(), 'DONE')
1357     self._addThread(target=self._doAddSet, args=(['nine']))
1358     self.assertRaises(Queue.Empty, self.done.get_nowait)
1359     self.ls.release()
1360     self._waitThreads()
1361     self.assertEqual(self.done.get_nowait(), 'DONE')
1362     self._setUpLS()
1363
1364   def testPriority(self):
1365     def _Acquire(prev, next, name, priority, success_fn):
1366       prev.wait()
1367       self.assert_(self.ls.acquire(name, shared=0,
1368                                    priority=priority,
1369                                    test_notify=lambda _: next.set()))
1370       try:
1371         success_fn()
1372       finally:
1373         self.ls.release()
1374
1375     # Get all in exclusive mode
1376     self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1377
1378     done_two = Queue.Queue(0)
1379
1380     first = threading.Event()
1381     prev = first
1382
1383     acquires = [("one", prio, self.done) for prio in range(1, 33)]
1384     acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1385
1386     # Use a deterministic random generator
1387     random.Random(741).shuffle(acquires)
1388
1389     for (name, prio, done) in acquires:
1390       ev = threading.Event()
1391       self._addThread(target=_Acquire,
1392                       args=(prev, ev, name, prio,
1393                             compat.partial(done.put, "Prio%s" % prio)))
1394       prev = ev
1395
1396     # Start acquires
1397     first.set()
1398
1399     # Wait for last acquire to start
1400     prev.wait()
1401
1402     # Let threads acquire locks
1403     self.ls.release()
1404
1405     # Wait for threads to finish
1406     self._waitThreads()
1407
1408     for i in range(1, 33):
1409       self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1410       self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1411
1412     self.assertRaises(Queue.Empty, self.done.get_nowait)
1413     self.assertRaises(Queue.Empty, done_two.get_nowait)
1414
1415
1416 class TestGanetiLockManager(_ThreadedTestCase):
1417
1418   def setUp(self):
1419     _ThreadedTestCase.setUp(self)
1420     self.nodes=['n1', 'n2']
1421     self.nodegroups=['g1', 'g2']
1422     self.instances=['i1', 'i2', 'i3']
1423     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1424                                         self.instances)
1425
1426   def tearDown(self):
1427     # Don't try this at home...
1428     locking.GanetiLockManager._instance = None
1429
1430   def testLockingConstants(self):
1431     # The locking library internally cheats by assuming its constants have some
1432     # relationships with each other. Check those hold true.
1433     # This relationship is also used in the Processor to recursively acquire
1434     # the right locks. Again, please don't break it.
1435     for i in range(len(locking.LEVELS)):
1436       self.assertEqual(i, locking.LEVELS[i])
1437
1438   def testDoubleGLFails(self):
1439     self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1440
1441   def testLockNames(self):
1442     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1443     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1444     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1445                      set(self.nodegroups))
1446     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1447                      set(self.instances))
1448
1449   def testInitAndResources(self):
1450     locking.GanetiLockManager._instance = None
1451     self.GL = locking.GanetiLockManager([], [], [])
1452     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1453     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1454     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1455     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1456
1457     locking.GanetiLockManager._instance = None
1458     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1459     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1460     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1461     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1462                                     set(self.nodegroups))
1463     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1464
1465     locking.GanetiLockManager._instance = None
1466     self.GL = locking.GanetiLockManager([], [], self.instances)
1467     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1468     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1469     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1470     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1471                      set(self.instances))
1472
1473   def testAcquireRelease(self):
1474     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1475     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1476     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1477     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1478     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1479     self.GL.release(locking.LEVEL_NODE, ['n2'])
1480     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1481     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1482     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1483     self.GL.release(locking.LEVEL_NODE)
1484     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1485     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1486     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1487     self.GL.release(locking.LEVEL_NODEGROUP)
1488     self.GL.release(locking.LEVEL_INSTANCE)
1489     self.assertRaises(errors.LockError, self.GL.acquire,
1490                       locking.LEVEL_INSTANCE, ['i5'])
1491     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1492     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1493
1494   def testAcquireWholeSets(self):
1495     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1496     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1497                       set(self.instances))
1498     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1499                       set(self.instances))
1500     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1501                       set(self.nodegroups))
1502     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1503                       set(self.nodegroups))
1504     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1505                       set(self.nodes))
1506     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1507                       set(self.nodes))
1508     self.GL.release(locking.LEVEL_NODE)
1509     self.GL.release(locking.LEVEL_NODEGROUP)
1510     self.GL.release(locking.LEVEL_INSTANCE)
1511     self.GL.release(locking.LEVEL_CLUSTER)
1512
1513   def testAcquireWholeAndPartial(self):
1514     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1515     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1516                       set(self.instances))
1517     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1518                       set(self.instances))
1519     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1520                       set(['n2']))
1521     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1522                       set(['n2']))
1523     self.GL.release(locking.LEVEL_NODE)
1524     self.GL.release(locking.LEVEL_INSTANCE)
1525     self.GL.release(locking.LEVEL_CLUSTER)
1526
1527   def testBGLDependency(self):
1528     self.assertRaises(AssertionError, self.GL.acquire,
1529                       locking.LEVEL_NODE, ['n1', 'n2'])
1530     self.assertRaises(AssertionError, self.GL.acquire,
1531                       locking.LEVEL_INSTANCE, ['i3'])
1532     self.assertRaises(AssertionError, self.GL.acquire,
1533                       locking.LEVEL_NODEGROUP, ['g1'])
1534     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1535     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1536     self.assertRaises(AssertionError, self.GL.release,
1537                       locking.LEVEL_CLUSTER, ['BGL'])
1538     self.assertRaises(AssertionError, self.GL.release,
1539                       locking.LEVEL_CLUSTER)
1540     self.GL.release(locking.LEVEL_NODE)
1541     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1542     self.assertRaises(AssertionError, self.GL.release,
1543                       locking.LEVEL_CLUSTER, ['BGL'])
1544     self.assertRaises(AssertionError, self.GL.release,
1545                       locking.LEVEL_CLUSTER)
1546     self.GL.release(locking.LEVEL_INSTANCE)
1547     self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1548     self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1549     self.assertRaises(AssertionError, self.GL.release,
1550                       locking.LEVEL_CLUSTER, ['BGL'])
1551     self.assertRaises(AssertionError, self.GL.release,
1552                       locking.LEVEL_CLUSTER)
1553     self.GL.release(locking.LEVEL_NODEGROUP)
1554     self.GL.release(locking.LEVEL_CLUSTER)
1555
1556   def testWrongOrder(self):
1557     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1558     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1559     self.assertRaises(AssertionError, self.GL.acquire,
1560                       locking.LEVEL_NODE, ['n1'])
1561     self.assertRaises(AssertionError, self.GL.acquire,
1562                       locking.LEVEL_NODEGROUP, ['g1'])
1563     self.assertRaises(AssertionError, self.GL.acquire,
1564                       locking.LEVEL_INSTANCE, ['i2'])
1565
1566   def testModifiableLevels(self):
1567     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1568                       ['BGL2'])
1569     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1570     self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1571     self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1572     self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1573     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1574     self.GL.add(locking.LEVEL_NODE, ['n3'])
1575     self.GL.remove(locking.LEVEL_NODE, ['n1'])
1576     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1577     self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1578     self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1579     self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1580     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1581     self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1582                       ['BGL2'])
1583
1584   # Helper function to run as a thread that shared the BGL and then acquires
1585   # some locks at another level.
1586   def _doLock(self, level, names, shared):
1587     try:
1588       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1589       self.GL.acquire(level, names, shared=shared)
1590       self.done.put('DONE')
1591       self.GL.release(level)
1592       self.GL.release(locking.LEVEL_CLUSTER)
1593     except errors.LockError:
1594       self.done.put('ERR')
1595
1596   @_Repeat
1597   def testConcurrency(self):
1598     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1599     self._addThread(target=self._doLock,
1600                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1601     self._waitThreads()
1602     self.assertEqual(self.done.get_nowait(), 'DONE')
1603     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1604     self._addThread(target=self._doLock,
1605                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1606     self._waitThreads()
1607     self.assertEqual(self.done.get_nowait(), 'DONE')
1608     self._addThread(target=self._doLock,
1609                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1610     self.assertRaises(Queue.Empty, self.done.get_nowait)
1611     self.GL.release(locking.LEVEL_INSTANCE)
1612     self._waitThreads()
1613     self.assertEqual(self.done.get_nowait(), 'DONE')
1614     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1615     self._addThread(target=self._doLock,
1616                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1617     self._waitThreads()
1618     self.assertEqual(self.done.get_nowait(), 'DONE')
1619     self._addThread(target=self._doLock,
1620                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1621     self.assertRaises(Queue.Empty, self.done.get_nowait)
1622     self.GL.release(locking.LEVEL_INSTANCE)
1623     self._waitThreads()
1624     self.assertEqual(self.done.get(True, 1), 'DONE')
1625     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1626
1627
1628 class TestLockMonitor(_ThreadedTestCase):
1629   def setUp(self):
1630     _ThreadedTestCase.setUp(self)
1631     self.lm = locking.LockMonitor()
1632
1633   def testSingleThread(self):
1634     locks = []
1635
1636     for i in range(100):
1637       name = "TestLock%s" % i
1638       locks.append(locking.SharedLock(name, monitor=self.lm))
1639
1640     self.assertEqual(len(self.lm._locks), len(locks))
1641
1642     self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
1643                      100)
1644
1645     # Delete all locks
1646     del locks[:]
1647
1648     # The garbage collector might needs some time
1649     def _CheckLocks():
1650       if self.lm._locks:
1651         raise utils.RetryAgain()
1652
1653     utils.Retry(_CheckLocks, 0.1, 30.0)
1654
1655     self.assertFalse(self.lm._locks)
1656
1657   def testMultiThread(self):
1658     locks = []
1659
1660     def _CreateLock(prev, next, name):
1661       prev.wait()
1662       locks.append(locking.SharedLock(name, monitor=self.lm))
1663       if next:
1664         next.set()
1665
1666     expnames = []
1667
1668     first = threading.Event()
1669     prev = first
1670
1671     # Use a deterministic random generator
1672     for i in random.Random(4263).sample(range(100), 33):
1673       name = "MtTestLock%s" % i
1674       expnames.append(name)
1675
1676       ev = threading.Event()
1677       self._addThread(target=_CreateLock, args=(prev, ev, name))
1678       prev = ev
1679
1680     # Add locks
1681     first.set()
1682     self._waitThreads()
1683
1684     # Check order in which locks were added
1685     self.assertEqual([i.name for i in locks], expnames)
1686
1687     # Sync queries are not supported
1688     self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
1689
1690     # Check query result
1691     self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1692                                         False),
1693                      [[name, None, None, []]
1694                       for name in utils.NiceSort(expnames)])
1695
1696     # Test exclusive acquire
1697     for tlock in locks[::4]:
1698       tlock.acquire(shared=0)
1699       try:
1700         def _GetExpResult(name):
1701           if tlock.name == name:
1702             return [name, "exclusive", [threading.currentThread().getName()],
1703                     []]
1704           return [name, None, None, []]
1705
1706         self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1707                                              "pending"], False),
1708                          [_GetExpResult(name)
1709                           for name in utils.NiceSort(expnames)])
1710       finally:
1711         tlock.release()
1712
1713     # Test shared acquire
1714     def _Acquire(lock, shared, ev, notify):
1715       lock.acquire(shared=shared)
1716       try:
1717         notify.set()
1718         ev.wait()
1719       finally:
1720         lock.release()
1721
1722     for tlock1 in locks[::11]:
1723       for tlock2 in locks[::-15]:
1724         if tlock2 == tlock1:
1725           # Avoid deadlocks
1726           continue
1727
1728         for tlock3 in locks[::10]:
1729           if tlock3 in (tlock2, tlock1):
1730             # Avoid deadlocks
1731             continue
1732
1733           releaseev = threading.Event()
1734
1735           # Acquire locks
1736           acquireev = []
1737           tthreads1 = []
1738           for i in range(3):
1739             ev = threading.Event()
1740             tthreads1.append(self._addThread(target=_Acquire,
1741                                              args=(tlock1, 1, releaseev, ev)))
1742             acquireev.append(ev)
1743
1744           ev = threading.Event()
1745           tthread2 = self._addThread(target=_Acquire,
1746                                      args=(tlock2, 1, releaseev, ev))
1747           acquireev.append(ev)
1748
1749           ev = threading.Event()
1750           tthread3 = self._addThread(target=_Acquire,
1751                                      args=(tlock3, 0, releaseev, ev))
1752           acquireev.append(ev)
1753
1754           # Wait for all locks to be acquired
1755           for i in acquireev:
1756             i.wait()
1757
1758           # Check query result
1759           for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
1760                                                          "owner"], False):
1761             if name == tlock1.name:
1762               self.assertEqual(mode, "shared")
1763               self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
1764               continue
1765
1766             if name == tlock2.name:
1767               self.assertEqual(mode, "shared")
1768               self.assertEqual(owner, [tthread2.getName()])
1769               continue
1770
1771             if name == tlock3.name:
1772               self.assertEqual(mode, "exclusive")
1773               self.assertEqual(owner, [tthread3.getName()])
1774               continue
1775
1776             self.assert_(name in expnames)
1777             self.assert_(mode is None)
1778             self.assert_(owner is None)
1779
1780           # Release locks again
1781           releaseev.set()
1782
1783           self._waitThreads()
1784
1785           self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1786                            [[name, None, None]
1787                             for name in utils.NiceSort(expnames)])
1788
1789   def testDelete(self):
1790     lock = locking.SharedLock("TestLock", monitor=self.lm)
1791
1792     self.assertEqual(len(self.lm._locks), 1)
1793     self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1794                      [[lock.name, None, None]])
1795
1796     lock.delete()
1797
1798     self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1799                      [[lock.name, "deleted", None]])
1800     self.assertEqual(len(self.lm._locks), 1)
1801
1802   def testPending(self):
1803     def _Acquire(lock, shared, prev, next):
1804       prev.wait()
1805
1806       lock.acquire(shared=shared, test_notify=next.set)
1807       try:
1808         pass
1809       finally:
1810         lock.release()
1811
1812     lock = locking.SharedLock("ExcLock", monitor=self.lm)
1813
1814     for shared in [0, 1]:
1815       lock.acquire()
1816       try:
1817         self.assertEqual(len(self.lm._locks), 1)
1818         self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1819                          [[lock.name, "exclusive",
1820                            [threading.currentThread().getName()]]])
1821
1822         threads = []
1823
1824         first = threading.Event()
1825         prev = first
1826
1827         for i in range(5):
1828           ev = threading.Event()
1829           threads.append(self._addThread(target=_Acquire,
1830                                           args=(lock, shared, prev, ev)))
1831           prev = ev
1832
1833         # Start acquires
1834         first.set()
1835
1836         # Wait for last acquire to start waiting
1837         prev.wait()
1838
1839         # NOTE: This works only because QueryLocks will acquire the
1840         # lock-internal lock again and won't be able to get the information
1841         # until it has the lock. By then the acquire should be registered in
1842         # SharedLock.__pending (otherwise it's a bug).
1843
1844         # All acquires are waiting now
1845         if shared:
1846           pending = [("shared", sorted([t.getName() for t in threads]))]
1847         else:
1848           pending = [("exclusive", [t.getName()]) for t in threads]
1849
1850         self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1851                                              "pending"], False),
1852                          [[lock.name, "exclusive",
1853                            [threading.currentThread().getName()],
1854                            pending]])
1855
1856         self.assertEqual(len(self.lm._locks), 1)
1857       finally:
1858         lock.release()
1859
1860       self._waitThreads()
1861
1862       # No pending acquires
1863       self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1864                                           False),
1865                        [[lock.name, None, None, []]])
1866
1867       self.assertEqual(len(self.lm._locks), 1)
1868
1869
1870 if __name__ == '__main__':
1871   testutils.GanetiTestProgram()