Add unittest for cli.FormatResultError
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29 import threading
30 import random
31 import itertools
32
33 from ganeti import constants
34 from ganeti import locking
35 from ganeti import errors
36 from ganeti import utils
37 from ganeti import compat
38 from ganeti import objects
39 from ganeti import query
40
41 import testutils
42
43
44 # This is used to test the ssynchronize decorator.
45 # Since it's passed as input to a decorator it must be declared as a global.
46 _decoratorlock = locking.SharedLock("decorator lock")
47
48 #: List for looping tests
49 ITERATIONS = range(8)
50
51
52 def _Repeat(fn):
53   """Decorator for executing a function many times"""
54   def wrapper(*args, **kwargs):
55     for i in ITERATIONS:
56       fn(*args, **kwargs)
57   return wrapper
58
59
60 def SafeSleep(duration):
61   start = time.time()
62   while True:
63     delay = start + duration - time.time()
64     if delay <= 0.0:
65       break
66     time.sleep(delay)
67
68
69 class _ThreadedTestCase(unittest.TestCase):
70   """Test class that supports adding/waiting on threads"""
71   def setUp(self):
72     unittest.TestCase.setUp(self)
73     self.done = Queue.Queue(0)
74     self.threads = []
75
76   def _addThread(self, *args, **kwargs):
77     """Create and remember a new thread"""
78     t = threading.Thread(*args, **kwargs)
79     self.threads.append(t)
80     t.start()
81     return t
82
83   def _waitThreads(self):
84     """Wait for all our threads to finish"""
85     for t in self.threads:
86       t.join(60)
87       self.failIf(t.isAlive())
88     self.threads = []
89
90
91 class _ConditionTestCase(_ThreadedTestCase):
92   """Common test case for conditions"""
93
94   def setUp(self, cls):
95     _ThreadedTestCase.setUp(self)
96     self.lock = threading.Lock()
97     self.cond = cls(self.lock)
98
99   def _testAcquireRelease(self):
100     self.assertFalse(self.cond._is_owned())
101     self.assertRaises(RuntimeError, self.cond.wait)
102     self.assertRaises(RuntimeError, self.cond.notifyAll)
103
104     self.cond.acquire()
105     self.assert_(self.cond._is_owned())
106     self.cond.notifyAll()
107     self.assert_(self.cond._is_owned())
108     self.cond.release()
109
110     self.assertFalse(self.cond._is_owned())
111     self.assertRaises(RuntimeError, self.cond.wait)
112     self.assertRaises(RuntimeError, self.cond.notifyAll)
113
114   def _testNotification(self):
115     def _NotifyAll():
116       self.done.put("NE")
117       self.cond.acquire()
118       self.done.put("NA")
119       self.cond.notifyAll()
120       self.done.put("NN")
121       self.cond.release()
122
123     self.cond.acquire()
124     self._addThread(target=_NotifyAll)
125     self.assertEqual(self.done.get(True, 1), "NE")
126     self.assertRaises(Queue.Empty, self.done.get_nowait)
127     self.cond.wait()
128     self.assertEqual(self.done.get(True, 1), "NA")
129     self.assertEqual(self.done.get(True, 1), "NN")
130     self.assert_(self.cond._is_owned())
131     self.cond.release()
132     self.assertFalse(self.cond._is_owned())
133
134
135 class TestSingleNotifyPipeCondition(_ConditionTestCase):
136   """SingleNotifyPipeCondition tests"""
137
138   def setUp(self):
139     _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
140
141   def testAcquireRelease(self):
142     self._testAcquireRelease()
143
144   def testNotification(self):
145     self._testNotification()
146
147   def testWaitReuse(self):
148     self.cond.acquire()
149     self.cond.wait(0)
150     self.cond.wait(0.1)
151     self.cond.release()
152
153   def testNoNotifyReuse(self):
154     self.cond.acquire()
155     self.cond.notifyAll()
156     self.assertRaises(RuntimeError, self.cond.wait)
157     self.assertRaises(RuntimeError, self.cond.notifyAll)
158     self.cond.release()
159
160
161 class TestPipeCondition(_ConditionTestCase):
162   """PipeCondition tests"""
163
164   def setUp(self):
165     _ConditionTestCase.setUp(self, locking.PipeCondition)
166
167   def testAcquireRelease(self):
168     self._testAcquireRelease()
169
170   def testNotification(self):
171     self._testNotification()
172
173   def _TestWait(self, fn):
174     threads = [
175       self._addThread(target=fn),
176       self._addThread(target=fn),
177       self._addThread(target=fn),
178       ]
179
180     # Wait for threads to be waiting
181     for _ in threads:
182       self.assertEqual(self.done.get(True, 1), "A")
183
184     self.assertRaises(Queue.Empty, self.done.get_nowait)
185
186     self.cond.acquire()
187     self.assertEqual(len(self.cond._waiters), 3)
188     self.assertEqual(self.cond._waiters, set(threads))
189     # This new thread can't acquire the lock, and thus call wait, before we
190     # release it
191     self._addThread(target=fn)
192     self.cond.notifyAll()
193     self.assertRaises(Queue.Empty, self.done.get_nowait)
194     self.cond.release()
195
196     # We should now get 3 W and 1 A (for the new thread) in whatever order
197     w = 0
198     a = 0
199     for i in range(4):
200       got = self.done.get(True, 1)
201       if got == "W":
202         w += 1
203       elif got == "A":
204         a += 1
205       else:
206         self.fail("Got %s on the done queue" % got)
207
208     self.assertEqual(w, 3)
209     self.assertEqual(a, 1)
210
211     self.cond.acquire()
212     self.cond.notifyAll()
213     self.cond.release()
214     self._waitThreads()
215     self.assertEqual(self.done.get_nowait(), "W")
216     self.assertRaises(Queue.Empty, self.done.get_nowait)
217
218   def testBlockingWait(self):
219     def _BlockingWait():
220       self.cond.acquire()
221       self.done.put("A")
222       self.cond.wait()
223       self.cond.release()
224       self.done.put("W")
225
226     self._TestWait(_BlockingWait)
227
228   def testLongTimeoutWait(self):
229     def _Helper():
230       self.cond.acquire()
231       self.done.put("A")
232       self.cond.wait(15.0)
233       self.cond.release()
234       self.done.put("W")
235
236     self._TestWait(_Helper)
237
238   def _TimeoutWait(self, timeout, check):
239     self.cond.acquire()
240     self.cond.wait(timeout)
241     self.cond.release()
242     self.done.put(check)
243
244   def testShortTimeoutWait(self):
245     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
246     self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247     self._waitThreads()
248     self.assertEqual(self.done.get_nowait(), "T1")
249     self.assertEqual(self.done.get_nowait(), "T1")
250     self.assertRaises(Queue.Empty, self.done.get_nowait)
251
252   def testZeroTimeoutWait(self):
253     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
254     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255     self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256     self._waitThreads()
257     self.assertEqual(self.done.get_nowait(), "T0")
258     self.assertEqual(self.done.get_nowait(), "T0")
259     self.assertEqual(self.done.get_nowait(), "T0")
260     self.assertRaises(Queue.Empty, self.done.get_nowait)
261
262
263 class TestSharedLock(_ThreadedTestCase):
264   """SharedLock tests"""
265
266   def setUp(self):
267     _ThreadedTestCase.setUp(self)
268     self.sl = locking.SharedLock("TestSharedLock")
269
270   def testSequenceAndOwnership(self):
271     self.assertFalse(self.sl._is_owned())
272     self.sl.acquire(shared=1)
273     self.assert_(self.sl._is_owned())
274     self.assert_(self.sl._is_owned(shared=1))
275     self.assertFalse(self.sl._is_owned(shared=0))
276     self.sl.release()
277     self.assertFalse(self.sl._is_owned())
278     self.sl.acquire()
279     self.assert_(self.sl._is_owned())
280     self.assertFalse(self.sl._is_owned(shared=1))
281     self.assert_(self.sl._is_owned(shared=0))
282     self.sl.release()
283     self.assertFalse(self.sl._is_owned())
284     self.sl.acquire(shared=1)
285     self.assert_(self.sl._is_owned())
286     self.assert_(self.sl._is_owned(shared=1))
287     self.assertFalse(self.sl._is_owned(shared=0))
288     self.sl.release()
289     self.assertFalse(self.sl._is_owned())
290
291   def testBooleanValue(self):
292     # semaphores are supposed to return a true value on a successful acquire
293     self.assert_(self.sl.acquire(shared=1))
294     self.sl.release()
295     self.assert_(self.sl.acquire())
296     self.sl.release()
297
298   def testDoubleLockingStoE(self):
299     self.sl.acquire(shared=1)
300     self.assertRaises(AssertionError, self.sl.acquire)
301
302   def testDoubleLockingEtoS(self):
303     self.sl.acquire()
304     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
305
306   def testDoubleLockingStoS(self):
307     self.sl.acquire(shared=1)
308     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
309
310   def testDoubleLockingEtoE(self):
311     self.sl.acquire()
312     self.assertRaises(AssertionError, self.sl.acquire)
313
314   # helper functions: called in a separate thread they acquire the lock, send
315   # their identifier on the done queue, then release it.
316   def _doItSharer(self):
317     try:
318       self.sl.acquire(shared=1)
319       self.done.put('SHR')
320       self.sl.release()
321     except errors.LockError:
322       self.done.put('ERR')
323
324   def _doItExclusive(self):
325     try:
326       self.sl.acquire()
327       self.done.put('EXC')
328       self.sl.release()
329     except errors.LockError:
330       self.done.put('ERR')
331
332   def _doItDelete(self):
333     try:
334       self.sl.delete()
335       self.done.put('DEL')
336     except errors.LockError:
337       self.done.put('ERR')
338
339   def testSharersCanCoexist(self):
340     self.sl.acquire(shared=1)
341     threading.Thread(target=self._doItSharer).start()
342     self.assert_(self.done.get(True, 1))
343     self.sl.release()
344
345   @_Repeat
346   def testExclusiveBlocksExclusive(self):
347     self.sl.acquire()
348     self._addThread(target=self._doItExclusive)
349     self.assertRaises(Queue.Empty, self.done.get_nowait)
350     self.sl.release()
351     self._waitThreads()
352     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
353
354   @_Repeat
355   def testExclusiveBlocksDelete(self):
356     self.sl.acquire()
357     self._addThread(target=self._doItDelete)
358     self.assertRaises(Queue.Empty, self.done.get_nowait)
359     self.sl.release()
360     self._waitThreads()
361     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
362     self.sl = locking.SharedLock(self.sl.name)
363
364   @_Repeat
365   def testExclusiveBlocksSharer(self):
366     self.sl.acquire()
367     self._addThread(target=self._doItSharer)
368     self.assertRaises(Queue.Empty, self.done.get_nowait)
369     self.sl.release()
370     self._waitThreads()
371     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
372
373   @_Repeat
374   def testSharerBlocksExclusive(self):
375     self.sl.acquire(shared=1)
376     self._addThread(target=self._doItExclusive)
377     self.assertRaises(Queue.Empty, self.done.get_nowait)
378     self.sl.release()
379     self._waitThreads()
380     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
381
382   @_Repeat
383   def testSharerBlocksDelete(self):
384     self.sl.acquire(shared=1)
385     self._addThread(target=self._doItDelete)
386     self.assertRaises(Queue.Empty, self.done.get_nowait)
387     self.sl.release()
388     self._waitThreads()
389     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
390     self.sl = locking.SharedLock(self.sl.name)
391
392   @_Repeat
393   def testWaitingExclusiveBlocksSharer(self):
394     """SKIPPED testWaitingExclusiveBlockSharer"""
395     return
396
397     self.sl.acquire(shared=1)
398     # the lock is acquired in shared mode...
399     self._addThread(target=self._doItExclusive)
400     # ...but now an exclusive is waiting...
401     self._addThread(target=self._doItSharer)
402     # ...so the sharer should be blocked as well
403     self.assertRaises(Queue.Empty, self.done.get_nowait)
404     self.sl.release()
405     self._waitThreads()
406     # The exclusive passed before
407     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
408     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
409
410   @_Repeat
411   def testWaitingSharerBlocksExclusive(self):
412     """SKIPPED testWaitingSharerBlocksExclusive"""
413     return
414
415     self.sl.acquire()
416     # the lock is acquired in exclusive mode...
417     self._addThread(target=self._doItSharer)
418     # ...but now a sharer is waiting...
419     self._addThread(target=self._doItExclusive)
420     # ...the exclusive is waiting too...
421     self.assertRaises(Queue.Empty, self.done.get_nowait)
422     self.sl.release()
423     self._waitThreads()
424     # The sharer passed before
425     self.assertEqual(self.done.get_nowait(), 'SHR')
426     self.assertEqual(self.done.get_nowait(), 'EXC')
427
428   def testDelete(self):
429     self.sl.delete()
430     self.assertRaises(errors.LockError, self.sl.acquire)
431     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
432     self.assertRaises(errors.LockError, self.sl.delete)
433
434   def testDeleteTimeout(self):
435     self.sl.delete(timeout=60)
436
437   def testNoDeleteIfSharer(self):
438     self.sl.acquire(shared=1)
439     self.assertRaises(AssertionError, self.sl.delete)
440
441   @_Repeat
442   def testDeletePendingSharersExclusiveDelete(self):
443     self.sl.acquire()
444     self._addThread(target=self._doItSharer)
445     self._addThread(target=self._doItSharer)
446     self._addThread(target=self._doItExclusive)
447     self._addThread(target=self._doItDelete)
448     self.sl.delete()
449     self._waitThreads()
450     # The threads who were pending return ERR
451     for _ in range(4):
452       self.assertEqual(self.done.get_nowait(), 'ERR')
453     self.sl = locking.SharedLock(self.sl.name)
454
455   @_Repeat
456   def testDeletePendingDeleteExclusiveSharers(self):
457     self.sl.acquire()
458     self._addThread(target=self._doItDelete)
459     self._addThread(target=self._doItExclusive)
460     self._addThread(target=self._doItSharer)
461     self._addThread(target=self._doItSharer)
462     self.sl.delete()
463     self._waitThreads()
464     # The two threads who were pending return both ERR
465     self.assertEqual(self.done.get_nowait(), 'ERR')
466     self.assertEqual(self.done.get_nowait(), 'ERR')
467     self.assertEqual(self.done.get_nowait(), 'ERR')
468     self.assertEqual(self.done.get_nowait(), 'ERR')
469     self.sl = locking.SharedLock(self.sl.name)
470
471   @_Repeat
472   def testExclusiveAcquireTimeout(self):
473     for shared in [0, 1]:
474       on_queue = threading.Event()
475       release_exclusive = threading.Event()
476
477       def _LockExclusive():
478         self.sl.acquire(shared=0, test_notify=on_queue.set)
479         self.done.put("A: start wait")
480         release_exclusive.wait()
481         self.done.put("A: end wait")
482         self.sl.release()
483
484       # Start thread to hold lock in exclusive mode
485       self._addThread(target=_LockExclusive)
486
487       # Wait for wait to begin
488       self.assertEqual(self.done.get(timeout=60), "A: start wait")
489
490       # Wait up to 60s to get lock, but release exclusive lock as soon as we're
491       # on the queue
492       self.failUnless(self.sl.acquire(shared=shared, timeout=60,
493                                       test_notify=release_exclusive.set))
494
495       self.done.put("got 2nd")
496       self.sl.release()
497
498       self._waitThreads()
499
500       self.assertEqual(self.done.get_nowait(), "A: end wait")
501       self.assertEqual(self.done.get_nowait(), "got 2nd")
502       self.assertRaises(Queue.Empty, self.done.get_nowait)
503
504   @_Repeat
505   def testAcquireExpiringTimeout(self):
506     def _AcquireWithTimeout(shared, timeout):
507       if not self.sl.acquire(shared=shared, timeout=timeout):
508         self.done.put("timeout")
509
510     for shared in [0, 1]:
511       # Lock exclusively
512       self.sl.acquire()
513
514       # Start shared acquires with timeout between 0 and 20 ms
515       for i in range(11):
516         self._addThread(target=_AcquireWithTimeout,
517                         args=(shared, i * 2.0 / 1000.0))
518
519       # Wait for threads to finish (makes sure the acquire timeout expires
520       # before releasing the lock)
521       self._waitThreads()
522
523       # Release lock
524       self.sl.release()
525
526       for _ in range(11):
527         self.assertEqual(self.done.get_nowait(), "timeout")
528
529       self.assertRaises(Queue.Empty, self.done.get_nowait)
530
531   @_Repeat
532   def testSharedSkipExclusiveAcquires(self):
533     # Tests whether shared acquires jump in front of exclusive acquires in the
534     # queue.
535
536     def _Acquire(shared, name, notify_ev, wait_ev):
537       if notify_ev:
538         notify_fn = notify_ev.set
539       else:
540         notify_fn = None
541
542       if wait_ev:
543         wait_ev.wait()
544
545       if not self.sl.acquire(shared=shared, test_notify=notify_fn):
546         return
547
548       self.done.put(name)
549       self.sl.release()
550
551     # Get exclusive lock while we fill the queue
552     self.sl.acquire()
553
554     shrcnt1 = 5
555     shrcnt2 = 7
556     shrcnt3 = 9
557     shrcnt4 = 2
558
559     # Add acquires using threading.Event for synchronization. They'll be
560     # acquired exactly in the order defined in this list.
561     acquires = (shrcnt1 * [(1, "shared 1")] +
562                 3 * [(0, "exclusive 1")] +
563                 shrcnt2 * [(1, "shared 2")] +
564                 shrcnt3 * [(1, "shared 3")] +
565                 shrcnt4 * [(1, "shared 4")] +
566                 3 * [(0, "exclusive 2")])
567
568     ev_cur = None
569     ev_prev = None
570
571     for args in acquires:
572       ev_cur = threading.Event()
573       self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
574       ev_prev = ev_cur
575
576     # Wait for last acquire to start
577     ev_prev.wait()
578
579     # Expect 6 pending exclusive acquires and 1 for all shared acquires
580     # together
581     self.assertEqual(self.sl._count_pending(), 7)
582
583     # Release exclusive lock and wait
584     self.sl.release()
585
586     self._waitThreads()
587
588     # Check sequence
589     for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
590       # Shared locks aren't guaranteed to be notified in order, but they'll be
591       # first
592       tmp = self.done.get_nowait()
593       if tmp == "shared 1":
594         shrcnt1 -= 1
595       elif tmp == "shared 2":
596         shrcnt2 -= 1
597       elif tmp == "shared 3":
598         shrcnt3 -= 1
599       elif tmp == "shared 4":
600         shrcnt4 -= 1
601     self.assertEqual(shrcnt1, 0)
602     self.assertEqual(shrcnt2, 0)
603     self.assertEqual(shrcnt3, 0)
604     self.assertEqual(shrcnt3, 0)
605
606     for _ in range(3):
607       self.assertEqual(self.done.get_nowait(), "exclusive 1")
608
609     for _ in range(3):
610       self.assertEqual(self.done.get_nowait(), "exclusive 2")
611
612     self.assertRaises(Queue.Empty, self.done.get_nowait)
613
614   @_Repeat
615   def testMixedAcquireTimeout(self):
616     sync = threading.Event()
617
618     def _AcquireShared(ev):
619       if not self.sl.acquire(shared=1, timeout=None):
620         return
621
622       self.done.put("shared")
623
624       # Notify main thread
625       ev.set()
626
627       # Wait for notification from main thread
628       sync.wait()
629
630       # Release lock
631       self.sl.release()
632
633     acquires = []
634     for _ in range(3):
635       ev = threading.Event()
636       self._addThread(target=_AcquireShared, args=(ev, ))
637       acquires.append(ev)
638
639     # Wait for all acquires to finish
640     for i in acquires:
641       i.wait()
642
643     self.assertEqual(self.sl._count_pending(), 0)
644
645     # Try to get exclusive lock
646     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
647
648     # Acquire exclusive without timeout
649     exclsync = threading.Event()
650     exclev = threading.Event()
651
652     def _AcquireExclusive():
653       if not self.sl.acquire(shared=0):
654         return
655
656       self.done.put("exclusive")
657
658       # Notify main thread
659       exclev.set()
660
661       # Wait for notification from main thread
662       exclsync.wait()
663
664       self.sl.release()
665
666     self._addThread(target=_AcquireExclusive)
667
668     # Try to get exclusive lock
669     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
670
671     # Make all shared holders release their locks
672     sync.set()
673
674     # Wait for exclusive acquire to succeed
675     exclev.wait()
676
677     self.assertEqual(self.sl._count_pending(), 0)
678
679     # Try to get exclusive lock
680     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
681
682     def _AcquireSharedSimple():
683       if self.sl.acquire(shared=1, timeout=None):
684         self.done.put("shared2")
685         self.sl.release()
686
687     for _ in range(10):
688       self._addThread(target=_AcquireSharedSimple)
689
690     # Tell exclusive lock to release
691     exclsync.set()
692
693     # Wait for everything to finish
694     self._waitThreads()
695
696     self.assertEqual(self.sl._count_pending(), 0)
697
698     # Check sequence
699     for _ in range(3):
700       self.assertEqual(self.done.get_nowait(), "shared")
701
702     self.assertEqual(self.done.get_nowait(), "exclusive")
703
704     for _ in range(10):
705       self.assertEqual(self.done.get_nowait(), "shared2")
706
707     self.assertRaises(Queue.Empty, self.done.get_nowait)
708
709   def testPriority(self):
710     # Acquire in exclusive mode
711     self.assert_(self.sl.acquire(shared=0))
712
713     # Queue acquires
714     def _Acquire(prev, next, shared, priority, result):
715       prev.wait()
716       self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
717       try:
718         self.done.put(result)
719       finally:
720         self.sl.release()
721
722     counter = itertools.count(0)
723     priorities = range(-20, 30)
724     first = threading.Event()
725     prev = first
726
727     # Data structure:
728     # {
729     #   priority:
730     #     [(shared/exclusive, set(acquire names), set(pending threads)),
731     #      (shared/exclusive, ...),
732     #      ...,
733     #     ],
734     # }
735     perprio = {}
736
737     # References shared acquire per priority in L{perprio}. Data structure:
738     # {
739     #   priority: (shared=1, set(acquire names), set(pending threads)),
740     # }
741     prioshared = {}
742
743     for seed in [4979, 9523, 14902, 32440]:
744       # Use a deterministic random generator
745       rnd = random.Random(seed)
746       for priority in [rnd.choice(priorities) for _ in range(30)]:
747         modes = [0, 1]
748         rnd.shuffle(modes)
749         for shared in modes:
750           # Unique name
751           acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
752
753           ev = threading.Event()
754           thread = self._addThread(target=_Acquire,
755                                    args=(prev, ev, shared, priority, acqname))
756           prev = ev
757
758           # Record expected aqcuire, see above for structure
759           data = (shared, set([acqname]), set([thread]))
760           priolist = perprio.setdefault(priority, [])
761           if shared:
762             priosh = prioshared.get(priority, None)
763             if priosh:
764               # Shared acquires are merged
765               for i, j in zip(priosh[1:], data[1:]):
766                 i.update(j)
767               assert data[0] == priosh[0]
768             else:
769               prioshared[priority] = data
770               priolist.append(data)
771           else:
772             priolist.append(data)
773
774     # Start all acquires and wait for them
775     first.set()
776     prev.wait()
777
778     # Check lock information
779     self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
780     self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
781                      (self.sl.name, "exclusive",
782                       [threading.currentThread().getName()], None))
783
784     self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
785
786     # Let threads acquire the lock
787     self.sl.release()
788
789     # Wait for everything to finish
790     self._waitThreads()
791
792     self.assert_(self.sl._check_empty())
793
794     # Check acquires by priority
795     for acquires in [perprio[i] for i in sorted(perprio.keys())]:
796       for (_, names, _) in acquires:
797         # For shared acquires, the set will contain 1..n entries. For exclusive
798         # acquires only one.
799         while names:
800           names.remove(self.done.get_nowait())
801       self.assertFalse(compat.any(names for (_, names, _) in acquires))
802
803     self.assertRaises(Queue.Empty, self.done.get_nowait)
804
805   def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
806     self.assertEqual(name, self.sl.name)
807     self.assert_(mode is None)
808     self.assert_(owner is None)
809
810     self.assertEqual([(pendmode, sorted(waiting))
811                       for (pendmode, waiting) in pending],
812                      [(["exclusive", "shared"][int(bool(shared))],
813                        sorted(t.getName() for t in threads))
814                       for acquires in [perprio[i]
815                                        for i in sorted(perprio.keys())]
816                       for (shared, _, threads) in acquires])
817
818
819 class TestSharedLockInCondition(_ThreadedTestCase):
820   """SharedLock as a condition lock tests"""
821
822   def setUp(self):
823     _ThreadedTestCase.setUp(self)
824     self.sl = locking.SharedLock("TestSharedLockInCondition")
825     self.setCondition()
826
827   def setCondition(self):
828     self.cond = threading.Condition(self.sl)
829
830   def testKeepMode(self):
831     self.cond.acquire(shared=1)
832     self.assert_(self.sl._is_owned(shared=1))
833     self.cond.wait(0)
834     self.assert_(self.sl._is_owned(shared=1))
835     self.cond.release()
836     self.cond.acquire(shared=0)
837     self.assert_(self.sl._is_owned(shared=0))
838     self.cond.wait(0)
839     self.assert_(self.sl._is_owned(shared=0))
840     self.cond.release()
841
842
843 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
844   """SharedLock as a pipe condition lock tests"""
845
846   def setCondition(self):
847     self.cond = locking.PipeCondition(self.sl)
848
849
850 class TestSSynchronizedDecorator(_ThreadedTestCase):
851   """Shared Lock Synchronized decorator test"""
852
853   def setUp(self):
854     _ThreadedTestCase.setUp(self)
855
856   @locking.ssynchronized(_decoratorlock)
857   def _doItExclusive(self):
858     self.assert_(_decoratorlock._is_owned())
859     self.done.put('EXC')
860
861   @locking.ssynchronized(_decoratorlock, shared=1)
862   def _doItSharer(self):
863     self.assert_(_decoratorlock._is_owned(shared=1))
864     self.done.put('SHR')
865
866   def testDecoratedFunctions(self):
867     self._doItExclusive()
868     self.assertFalse(_decoratorlock._is_owned())
869     self._doItSharer()
870     self.assertFalse(_decoratorlock._is_owned())
871
872   def testSharersCanCoexist(self):
873     _decoratorlock.acquire(shared=1)
874     threading.Thread(target=self._doItSharer).start()
875     self.assert_(self.done.get(True, 1))
876     _decoratorlock.release()
877
878   @_Repeat
879   def testExclusiveBlocksExclusive(self):
880     _decoratorlock.acquire()
881     self._addThread(target=self._doItExclusive)
882     # give it a bit of time to check that it's not actually doing anything
883     self.assertRaises(Queue.Empty, self.done.get_nowait)
884     _decoratorlock.release()
885     self._waitThreads()
886     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
887
888   @_Repeat
889   def testExclusiveBlocksSharer(self):
890     _decoratorlock.acquire()
891     self._addThread(target=self._doItSharer)
892     self.assertRaises(Queue.Empty, self.done.get_nowait)
893     _decoratorlock.release()
894     self._waitThreads()
895     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
896
897   @_Repeat
898   def testSharerBlocksExclusive(self):
899     _decoratorlock.acquire(shared=1)
900     self._addThread(target=self._doItExclusive)
901     self.assertRaises(Queue.Empty, self.done.get_nowait)
902     _decoratorlock.release()
903     self._waitThreads()
904     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
905
906
907 class TestLockSet(_ThreadedTestCase):
908   """LockSet tests"""
909
910   def setUp(self):
911     _ThreadedTestCase.setUp(self)
912     self._setUpLS()
913
914   def _setUpLS(self):
915     """Helper to (re)initialize the lock set"""
916     self.resources = ['one', 'two', 'three']
917     self.ls = locking.LockSet(self.resources, "TestLockSet")
918
919   def testResources(self):
920     self.assertEquals(self.ls._names(), set(self.resources))
921     newls = locking.LockSet([], "TestLockSet.testResources")
922     self.assertEquals(newls._names(), set())
923
924   def testAcquireRelease(self):
925     self.assert_(self.ls.acquire('one'))
926     self.assertEquals(self.ls._list_owned(), set(['one']))
927     self.ls.release()
928     self.assertEquals(self.ls._list_owned(), set())
929     self.assertEquals(self.ls.acquire(['one']), set(['one']))
930     self.assertEquals(self.ls._list_owned(), set(['one']))
931     self.ls.release()
932     self.assertEquals(self.ls._list_owned(), set())
933     self.ls.acquire(['one', 'two', 'three'])
934     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
935     self.ls.release('one')
936     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
937     self.ls.release(['three'])
938     self.assertEquals(self.ls._list_owned(), set(['two']))
939     self.ls.release()
940     self.assertEquals(self.ls._list_owned(), set())
941     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
942     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
943     self.ls.release()
944     self.assertEquals(self.ls._list_owned(), set())
945
946   def testNoDoubleAcquire(self):
947     self.ls.acquire('one')
948     self.assertRaises(AssertionError, self.ls.acquire, 'one')
949     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
950     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
951     self.ls.release()
952     self.ls.acquire(['one', 'three'])
953     self.ls.release('one')
954     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
955     self.ls.release('three')
956
957   def testNoWrongRelease(self):
958     self.assertRaises(AssertionError, self.ls.release)
959     self.ls.acquire('one')
960     self.assertRaises(AssertionError, self.ls.release, 'two')
961
962   def testAddRemove(self):
963     self.ls.add('four')
964     self.assertEquals(self.ls._list_owned(), set())
965     self.assert_('four' in self.ls._names())
966     self.ls.add(['five', 'six', 'seven'], acquired=1)
967     self.assert_('five' in self.ls._names())
968     self.assert_('six' in self.ls._names())
969     self.assert_('seven' in self.ls._names())
970     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
971     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
972     self.assert_('five' not in self.ls._names())
973     self.assert_('six' not in self.ls._names())
974     self.assertEquals(self.ls._list_owned(), set(['seven']))
975     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
976     self.ls.remove('seven')
977     self.assert_('seven' not in self.ls._names())
978     self.assertEquals(self.ls._list_owned(), set([]))
979     self.ls.acquire(None, shared=1)
980     self.assertRaises(AssertionError, self.ls.add, 'eight')
981     self.ls.release()
982     self.ls.acquire(None)
983     self.ls.add('eight', acquired=1)
984     self.assert_('eight' in self.ls._names())
985     self.assert_('eight' in self.ls._list_owned())
986     self.ls.add('nine')
987     self.assert_('nine' in self.ls._names())
988     self.assert_('nine' not in self.ls._list_owned())
989     self.ls.release()
990     self.ls.remove(['two'])
991     self.assert_('two' not in self.ls._names())
992     self.ls.acquire('three')
993     self.assertEquals(self.ls.remove(['three']), ['three'])
994     self.assert_('three' not in self.ls._names())
995     self.assertEquals(self.ls.remove('three'), [])
996     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
997     self.assert_('one' not in self.ls._names())
998
999   def testRemoveNonBlocking(self):
1000     self.ls.acquire('one')
1001     self.assertEquals(self.ls.remove('one'), ['one'])
1002     self.ls.acquire(['two', 'three'])
1003     self.assertEquals(self.ls.remove(['two', 'three']),
1004                       ['two', 'three'])
1005
1006   def testNoDoubleAdd(self):
1007     self.assertRaises(errors.LockError, self.ls.add, 'two')
1008     self.ls.add('four')
1009     self.assertRaises(errors.LockError, self.ls.add, 'four')
1010
1011   def testNoWrongRemoves(self):
1012     self.ls.acquire(['one', 'three'], shared=1)
1013     # Cannot remove 'two' while holding something which is not a superset
1014     self.assertRaises(AssertionError, self.ls.remove, 'two')
1015     # Cannot remove 'three' as we are sharing it
1016     self.assertRaises(AssertionError, self.ls.remove, 'three')
1017
1018   def testAcquireSetLock(self):
1019     # acquire the set-lock exclusively
1020     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1021     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1022     self.assertEquals(self.ls._is_owned(), True)
1023     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1024     # I can still add/remove elements...
1025     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1026     self.assert_(self.ls.add('six'))
1027     self.ls.release()
1028     # share the set-lock
1029     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1030     # adding new elements is not possible
1031     self.assertRaises(AssertionError, self.ls.add, 'five')
1032     self.ls.release()
1033
1034   def testAcquireWithRepetitions(self):
1035     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1036                       set(['two', 'two', 'three']))
1037     self.ls.release(['two', 'two'])
1038     self.assertEquals(self.ls._list_owned(), set(['three']))
1039
1040   def testEmptyAcquire(self):
1041     # Acquire an empty list of locks...
1042     self.assertEquals(self.ls.acquire([]), set())
1043     self.assertEquals(self.ls._list_owned(), set())
1044     # New locks can still be addded
1045     self.assert_(self.ls.add('six'))
1046     # "re-acquiring" is not an issue, since we had really acquired nothing
1047     self.assertEquals(self.ls.acquire([], shared=1), set())
1048     self.assertEquals(self.ls._list_owned(), set())
1049     # We haven't really acquired anything, so we cannot release
1050     self.assertRaises(AssertionError, self.ls.release)
1051
1052   def _doLockSet(self, names, shared):
1053     try:
1054       self.ls.acquire(names, shared=shared)
1055       self.done.put('DONE')
1056       self.ls.release()
1057     except errors.LockError:
1058       self.done.put('ERR')
1059
1060   def _doAddSet(self, names):
1061     try:
1062       self.ls.add(names, acquired=1)
1063       self.done.put('DONE')
1064       self.ls.release()
1065     except errors.LockError:
1066       self.done.put('ERR')
1067
1068   def _doRemoveSet(self, names):
1069     self.done.put(self.ls.remove(names))
1070
1071   @_Repeat
1072   def testConcurrentSharedAcquire(self):
1073     self.ls.acquire(['one', 'two'], shared=1)
1074     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1075     self._waitThreads()
1076     self.assertEqual(self.done.get_nowait(), 'DONE')
1077     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1078     self._waitThreads()
1079     self.assertEqual(self.done.get_nowait(), 'DONE')
1080     self._addThread(target=self._doLockSet, args=('three', 1))
1081     self._waitThreads()
1082     self.assertEqual(self.done.get_nowait(), 'DONE')
1083     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1084     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1085     self.assertRaises(Queue.Empty, self.done.get_nowait)
1086     self.ls.release()
1087     self._waitThreads()
1088     self.assertEqual(self.done.get_nowait(), 'DONE')
1089     self.assertEqual(self.done.get_nowait(), 'DONE')
1090
1091   @_Repeat
1092   def testConcurrentExclusiveAcquire(self):
1093     self.ls.acquire(['one', 'two'])
1094     self._addThread(target=self._doLockSet, args=('three', 1))
1095     self._waitThreads()
1096     self.assertEqual(self.done.get_nowait(), 'DONE')
1097     self._addThread(target=self._doLockSet, args=('three', 0))
1098     self._waitThreads()
1099     self.assertEqual(self.done.get_nowait(), 'DONE')
1100     self.assertRaises(Queue.Empty, self.done.get_nowait)
1101     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1102     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1103     self._addThread(target=self._doLockSet, args=('one', 0))
1104     self._addThread(target=self._doLockSet, args=('one', 1))
1105     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1106     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1107     self.assertRaises(Queue.Empty, self.done.get_nowait)
1108     self.ls.release()
1109     self._waitThreads()
1110     for _ in range(6):
1111       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1112
1113   @_Repeat
1114   def testSimpleAcquireTimeoutExpiring(self):
1115     names = sorted(self.ls._names())
1116     self.assert_(len(names) >= 3)
1117
1118     # Get name of first lock
1119     first = names[0]
1120
1121     # Get name of last lock
1122     last = names.pop()
1123
1124     checks = [
1125       # Block first and try to lock it again
1126       (first, first),
1127
1128       # Block last and try to lock all locks
1129       (None, first),
1130
1131       # Block last and try to lock it again
1132       (last, last),
1133       ]
1134
1135     for (wanted, block) in checks:
1136       # Lock in exclusive mode
1137       self.assert_(self.ls.acquire(block, shared=0))
1138
1139       def _AcquireOne():
1140         # Try to get the same lock again with a timeout (should never succeed)
1141         acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1142         if acquired:
1143           self.done.put("acquired")
1144           self.ls.release()
1145         else:
1146           self.assert_(acquired is None)
1147           self.assertFalse(self.ls._list_owned())
1148           self.assertFalse(self.ls._is_owned())
1149           self.done.put("not acquired")
1150
1151       self._addThread(target=_AcquireOne)
1152
1153       # Wait for timeout in thread to expire
1154       self._waitThreads()
1155
1156       # Release exclusive lock again
1157       self.ls.release()
1158
1159       self.assertEqual(self.done.get_nowait(), "not acquired")
1160       self.assertRaises(Queue.Empty, self.done.get_nowait)
1161
1162   @_Repeat
1163   def testDelayedAndExpiringLockAcquire(self):
1164     self._setUpLS()
1165     self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1166
1167     for expire in (False, True):
1168       names = sorted(self.ls._names())
1169       self.assertEqual(len(names), 8)
1170
1171       lock_ev = dict([(i, threading.Event()) for i in names])
1172
1173       # Lock all in exclusive mode
1174       self.assert_(self.ls.acquire(names, shared=0))
1175
1176       if expire:
1177         # We'll wait at least 300ms per lock
1178         lockwait = len(names) * [0.3]
1179
1180         # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1181         # this gives us up to 2.4s to fail.
1182         lockall_timeout = 0.4
1183       else:
1184         # This should finish rather quickly
1185         lockwait = None
1186         lockall_timeout = len(names) * 5.0
1187
1188       def _LockAll():
1189         def acquire_notification(name):
1190           if not expire:
1191             self.done.put("getting %s" % name)
1192
1193           # Kick next lock
1194           lock_ev[name].set()
1195
1196         if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1197                            test_notify=acquire_notification):
1198           self.done.put("got all")
1199           self.ls.release()
1200         else:
1201           self.done.put("timeout on all")
1202
1203         # Notify all locks
1204         for ev in lock_ev.values():
1205           ev.set()
1206
1207       t = self._addThread(target=_LockAll)
1208
1209       for idx, name in enumerate(names):
1210         # Wait for actual acquire on this lock to start
1211         lock_ev[name].wait(10.0)
1212
1213         if expire and t.isAlive():
1214           # Wait some time after getting the notification to make sure the lock
1215           # acquire will expire
1216           SafeSleep(lockwait[idx])
1217
1218         self.ls.release(names=name)
1219
1220       self.assertFalse(self.ls._list_owned())
1221
1222       self._waitThreads()
1223
1224       if expire:
1225         # Not checking which locks were actually acquired. Doing so would be
1226         # too timing-dependant.
1227         self.assertEqual(self.done.get_nowait(), "timeout on all")
1228       else:
1229         for i in names:
1230           self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1231         self.assertEqual(self.done.get_nowait(), "got all")
1232       self.assertRaises(Queue.Empty, self.done.get_nowait)
1233
1234   @_Repeat
1235   def testConcurrentRemove(self):
1236     self.ls.add('four')
1237     self.ls.acquire(['one', 'two', 'four'])
1238     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1239     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1240     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1241     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1242     self.assertRaises(Queue.Empty, self.done.get_nowait)
1243     self.ls.remove('one')
1244     self.ls.release()
1245     self._waitThreads()
1246     for i in range(4):
1247       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1248     self.ls.add(['five', 'six'], acquired=1)
1249     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1250     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1251     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1252     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1253     self.ls.remove('five')
1254     self.ls.release()
1255     self._waitThreads()
1256     for i in range(4):
1257       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1258     self.ls.acquire(['three', 'four'])
1259     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1260     self.assertRaises(Queue.Empty, self.done.get_nowait)
1261     self.ls.remove('four')
1262     self._waitThreads()
1263     self.assertEqual(self.done.get_nowait(), ['six'])
1264     self._addThread(target=self._doRemoveSet, args=(['two']))
1265     self._waitThreads()
1266     self.assertEqual(self.done.get_nowait(), ['two'])
1267     self.ls.release()
1268     # reset lockset
1269     self._setUpLS()
1270
1271   @_Repeat
1272   def testConcurrentSharedSetLock(self):
1273     # share the set-lock...
1274     self.ls.acquire(None, shared=1)
1275     # ...another thread can share it too
1276     self._addThread(target=self._doLockSet, args=(None, 1))
1277     self._waitThreads()
1278     self.assertEqual(self.done.get_nowait(), 'DONE')
1279     # ...or just share some elements
1280     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1281     self._waitThreads()
1282     self.assertEqual(self.done.get_nowait(), 'DONE')
1283     # ...but not add new ones or remove any
1284     t = self._addThread(target=self._doAddSet, args=(['nine']))
1285     self._addThread(target=self._doRemoveSet, args=(['two'], ))
1286     self.assertRaises(Queue.Empty, self.done.get_nowait)
1287     # this just releases the set-lock
1288     self.ls.release([])
1289     t.join(60)
1290     self.assertEqual(self.done.get_nowait(), 'DONE')
1291     # release the lock on the actual elements so remove() can proceed too
1292     self.ls.release()
1293     self._waitThreads()
1294     self.failUnlessEqual(self.done.get_nowait(), ['two'])
1295     # reset lockset
1296     self._setUpLS()
1297
1298   @_Repeat
1299   def testConcurrentExclusiveSetLock(self):
1300     # acquire the set-lock...
1301     self.ls.acquire(None, shared=0)
1302     # ...no one can do anything else
1303     self._addThread(target=self._doLockSet, args=(None, 1))
1304     self._addThread(target=self._doLockSet, args=(None, 0))
1305     self._addThread(target=self._doLockSet, args=(['three'], 0))
1306     self._addThread(target=self._doLockSet, args=(['two'], 1))
1307     self._addThread(target=self._doAddSet, args=(['nine']))
1308     self.assertRaises(Queue.Empty, self.done.get_nowait)
1309     self.ls.release()
1310     self._waitThreads()
1311     for _ in range(5):
1312       self.assertEqual(self.done.get(True, 1), 'DONE')
1313     # cleanup
1314     self._setUpLS()
1315
1316   @_Repeat
1317   def testConcurrentSetLockAdd(self):
1318     self.ls.acquire('one')
1319     # Another thread wants the whole SetLock
1320     self._addThread(target=self._doLockSet, args=(None, 0))
1321     self._addThread(target=self._doLockSet, args=(None, 1))
1322     self.assertRaises(Queue.Empty, self.done.get_nowait)
1323     self.assertRaises(AssertionError, self.ls.add, 'four')
1324     self.ls.release()
1325     self._waitThreads()
1326     self.assertEqual(self.done.get_nowait(), 'DONE')
1327     self.assertEqual(self.done.get_nowait(), 'DONE')
1328     self.ls.acquire(None)
1329     self._addThread(target=self._doLockSet, args=(None, 0))
1330     self._addThread(target=self._doLockSet, args=(None, 1))
1331     self.assertRaises(Queue.Empty, self.done.get_nowait)
1332     self.ls.add('four')
1333     self.ls.add('five', acquired=1)
1334     self.ls.add('six', acquired=1, shared=1)
1335     self.assertEquals(self.ls._list_owned(),
1336       set(['one', 'two', 'three', 'five', 'six']))
1337     self.assertEquals(self.ls._is_owned(), True)
1338     self.assertEquals(self.ls._names(),
1339       set(['one', 'two', 'three', 'four', 'five', 'six']))
1340     self.ls.release()
1341     self._waitThreads()
1342     self.assertEqual(self.done.get_nowait(), 'DONE')
1343     self.assertEqual(self.done.get_nowait(), 'DONE')
1344     self._setUpLS()
1345
1346   @_Repeat
1347   def testEmptyLockSet(self):
1348     # get the set-lock
1349     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1350     # now empty it...
1351     self.ls.remove(['one', 'two', 'three'])
1352     # and adds/locks by another thread still wait
1353     self._addThread(target=self._doAddSet, args=(['nine']))
1354     self._addThread(target=self._doLockSet, args=(None, 1))
1355     self._addThread(target=self._doLockSet, args=(None, 0))
1356     self.assertRaises(Queue.Empty, self.done.get_nowait)
1357     self.ls.release()
1358     self._waitThreads()
1359     for _ in range(3):
1360       self.assertEqual(self.done.get_nowait(), 'DONE')
1361     # empty it again...
1362     self.assertEqual(self.ls.remove(['nine']), ['nine'])
1363     # now share it...
1364     self.assertEqual(self.ls.acquire(None, shared=1), set())
1365     # other sharers can go, adds still wait
1366     self._addThread(target=self._doLockSet, args=(None, 1))
1367     self._waitThreads()
1368     self.assertEqual(self.done.get_nowait(), 'DONE')
1369     self._addThread(target=self._doAddSet, args=(['nine']))
1370     self.assertRaises(Queue.Empty, self.done.get_nowait)
1371     self.ls.release()
1372     self._waitThreads()
1373     self.assertEqual(self.done.get_nowait(), 'DONE')
1374     self._setUpLS()
1375
1376   def testPriority(self):
1377     def _Acquire(prev, next, name, priority, success_fn):
1378       prev.wait()
1379       self.assert_(self.ls.acquire(name, shared=0,
1380                                    priority=priority,
1381                                    test_notify=lambda _: next.set()))
1382       try:
1383         success_fn()
1384       finally:
1385         self.ls.release()
1386
1387     # Get all in exclusive mode
1388     self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1389
1390     done_two = Queue.Queue(0)
1391
1392     first = threading.Event()
1393     prev = first
1394
1395     acquires = [("one", prio, self.done) for prio in range(1, 33)]
1396     acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1397
1398     # Use a deterministic random generator
1399     random.Random(741).shuffle(acquires)
1400
1401     for (name, prio, done) in acquires:
1402       ev = threading.Event()
1403       self._addThread(target=_Acquire,
1404                       args=(prev, ev, name, prio,
1405                             compat.partial(done.put, "Prio%s" % prio)))
1406       prev = ev
1407
1408     # Start acquires
1409     first.set()
1410
1411     # Wait for last acquire to start
1412     prev.wait()
1413
1414     # Let threads acquire locks
1415     self.ls.release()
1416
1417     # Wait for threads to finish
1418     self._waitThreads()
1419
1420     for i in range(1, 33):
1421       self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1422       self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1423
1424     self.assertRaises(Queue.Empty, self.done.get_nowait)
1425     self.assertRaises(Queue.Empty, done_two.get_nowait)
1426
1427
1428 class TestGanetiLockManager(_ThreadedTestCase):
1429
1430   def setUp(self):
1431     _ThreadedTestCase.setUp(self)
1432     self.nodes=['n1', 'n2']
1433     self.nodegroups=['g1', 'g2']
1434     self.instances=['i1', 'i2', 'i3']
1435     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1436                                         self.instances)
1437
1438   def tearDown(self):
1439     # Don't try this at home...
1440     locking.GanetiLockManager._instance = None
1441
1442   def testLockingConstants(self):
1443     # The locking library internally cheats by assuming its constants have some
1444     # relationships with each other. Check those hold true.
1445     # This relationship is also used in the Processor to recursively acquire
1446     # the right locks. Again, please don't break it.
1447     for i in range(len(locking.LEVELS)):
1448       self.assertEqual(i, locking.LEVELS[i])
1449
1450   def testDoubleGLFails(self):
1451     self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1452
1453   def testLockNames(self):
1454     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1455     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1456     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1457                      set(self.nodegroups))
1458     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1459                      set(self.instances))
1460
1461   def testInitAndResources(self):
1462     locking.GanetiLockManager._instance = None
1463     self.GL = locking.GanetiLockManager([], [], [])
1464     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1465     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1466     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1467     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1468
1469     locking.GanetiLockManager._instance = None
1470     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1471     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1472     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1473     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1474                                     set(self.nodegroups))
1475     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1476
1477     locking.GanetiLockManager._instance = None
1478     self.GL = locking.GanetiLockManager([], [], self.instances)
1479     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1480     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1481     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1482     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1483                      set(self.instances))
1484
1485   def testAcquireRelease(self):
1486     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1487     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1488     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1489     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1490     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1491     self.GL.release(locking.LEVEL_NODE, ['n2'])
1492     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1493     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1494     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1495     self.GL.release(locking.LEVEL_NODE)
1496     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1497     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1498     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1499     self.GL.release(locking.LEVEL_NODEGROUP)
1500     self.GL.release(locking.LEVEL_INSTANCE)
1501     self.assertRaises(errors.LockError, self.GL.acquire,
1502                       locking.LEVEL_INSTANCE, ['i5'])
1503     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1504     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1505
1506   def testAcquireWholeSets(self):
1507     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1508     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1509                       set(self.instances))
1510     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1511                       set(self.instances))
1512     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1513                       set(self.nodegroups))
1514     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1515                       set(self.nodegroups))
1516     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1517                       set(self.nodes))
1518     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1519                       set(self.nodes))
1520     self.GL.release(locking.LEVEL_NODE)
1521     self.GL.release(locking.LEVEL_NODEGROUP)
1522     self.GL.release(locking.LEVEL_INSTANCE)
1523     self.GL.release(locking.LEVEL_CLUSTER)
1524
1525   def testAcquireWholeAndPartial(self):
1526     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1527     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1528                       set(self.instances))
1529     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1530                       set(self.instances))
1531     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1532                       set(['n2']))
1533     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1534                       set(['n2']))
1535     self.GL.release(locking.LEVEL_NODE)
1536     self.GL.release(locking.LEVEL_INSTANCE)
1537     self.GL.release(locking.LEVEL_CLUSTER)
1538
1539   def testBGLDependency(self):
1540     self.assertRaises(AssertionError, self.GL.acquire,
1541                       locking.LEVEL_NODE, ['n1', 'n2'])
1542     self.assertRaises(AssertionError, self.GL.acquire,
1543                       locking.LEVEL_INSTANCE, ['i3'])
1544     self.assertRaises(AssertionError, self.GL.acquire,
1545                       locking.LEVEL_NODEGROUP, ['g1'])
1546     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1547     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1548     self.assertRaises(AssertionError, self.GL.release,
1549                       locking.LEVEL_CLUSTER, ['BGL'])
1550     self.assertRaises(AssertionError, self.GL.release,
1551                       locking.LEVEL_CLUSTER)
1552     self.GL.release(locking.LEVEL_NODE)
1553     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1554     self.assertRaises(AssertionError, self.GL.release,
1555                       locking.LEVEL_CLUSTER, ['BGL'])
1556     self.assertRaises(AssertionError, self.GL.release,
1557                       locking.LEVEL_CLUSTER)
1558     self.GL.release(locking.LEVEL_INSTANCE)
1559     self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1560     self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1561     self.assertRaises(AssertionError, self.GL.release,
1562                       locking.LEVEL_CLUSTER, ['BGL'])
1563     self.assertRaises(AssertionError, self.GL.release,
1564                       locking.LEVEL_CLUSTER)
1565     self.GL.release(locking.LEVEL_NODEGROUP)
1566     self.GL.release(locking.LEVEL_CLUSTER)
1567
1568   def testWrongOrder(self):
1569     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1570     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1571     self.assertRaises(AssertionError, self.GL.acquire,
1572                       locking.LEVEL_NODE, ['n1'])
1573     self.assertRaises(AssertionError, self.GL.acquire,
1574                       locking.LEVEL_NODEGROUP, ['g1'])
1575     self.assertRaises(AssertionError, self.GL.acquire,
1576                       locking.LEVEL_INSTANCE, ['i2'])
1577
1578   def testModifiableLevels(self):
1579     self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1580                       ['BGL2'])
1581     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1582     self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1583     self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1584     self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1585     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1586     self.GL.add(locking.LEVEL_NODE, ['n3'])
1587     self.GL.remove(locking.LEVEL_NODE, ['n1'])
1588     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1589     self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1590     self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1591     self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1592     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1593     self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1594                       ['BGL2'])
1595
1596   # Helper function to run as a thread that shared the BGL and then acquires
1597   # some locks at another level.
1598   def _doLock(self, level, names, shared):
1599     try:
1600       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1601       self.GL.acquire(level, names, shared=shared)
1602       self.done.put('DONE')
1603       self.GL.release(level)
1604       self.GL.release(locking.LEVEL_CLUSTER)
1605     except errors.LockError:
1606       self.done.put('ERR')
1607
1608   @_Repeat
1609   def testConcurrency(self):
1610     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1611     self._addThread(target=self._doLock,
1612                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1613     self._waitThreads()
1614     self.assertEqual(self.done.get_nowait(), 'DONE')
1615     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1616     self._addThread(target=self._doLock,
1617                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
1618     self._waitThreads()
1619     self.assertEqual(self.done.get_nowait(), 'DONE')
1620     self._addThread(target=self._doLock,
1621                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
1622     self.assertRaises(Queue.Empty, self.done.get_nowait)
1623     self.GL.release(locking.LEVEL_INSTANCE)
1624     self._waitThreads()
1625     self.assertEqual(self.done.get_nowait(), 'DONE')
1626     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1627     self._addThread(target=self._doLock,
1628                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
1629     self._waitThreads()
1630     self.assertEqual(self.done.get_nowait(), 'DONE')
1631     self._addThread(target=self._doLock,
1632                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
1633     self.assertRaises(Queue.Empty, self.done.get_nowait)
1634     self.GL.release(locking.LEVEL_INSTANCE)
1635     self._waitThreads()
1636     self.assertEqual(self.done.get(True, 1), 'DONE')
1637     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1638
1639
1640 class TestLockMonitor(_ThreadedTestCase):
1641   def setUp(self):
1642     _ThreadedTestCase.setUp(self)
1643     self.lm = locking.LockMonitor()
1644
1645   def testSingleThread(self):
1646     locks = []
1647
1648     for i in range(100):
1649       name = "TestLock%s" % i
1650       locks.append(locking.SharedLock(name, monitor=self.lm))
1651
1652     self.assertEqual(len(self.lm._locks), len(locks))
1653     result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1654     self.assertEqual(len(result.fields), 1)
1655     self.assertEqual(len(result.data), 100)
1656
1657     # Delete all locks
1658     del locks[:]
1659
1660     # The garbage collector might needs some time
1661     def _CheckLocks():
1662       if self.lm._locks:
1663         raise utils.RetryAgain()
1664
1665     utils.Retry(_CheckLocks, 0.1, 30.0)
1666
1667     self.assertFalse(self.lm._locks)
1668
1669   def testMultiThread(self):
1670     locks = []
1671
1672     def _CreateLock(prev, next, name):
1673       prev.wait()
1674       locks.append(locking.SharedLock(name, monitor=self.lm))
1675       if next:
1676         next.set()
1677
1678     expnames = []
1679
1680     first = threading.Event()
1681     prev = first
1682
1683     # Use a deterministic random generator
1684     for i in random.Random(4263).sample(range(100), 33):
1685       name = "MtTestLock%s" % i
1686       expnames.append(name)
1687
1688       ev = threading.Event()
1689       self._addThread(target=_CreateLock, args=(prev, ev, name))
1690       prev = ev
1691
1692     # Add locks
1693     first.set()
1694     self._waitThreads()
1695
1696     # Check order in which locks were added
1697     self.assertEqual([i.name for i in locks], expnames)
1698
1699     # Check query result
1700     result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1701     self.assert_(isinstance(result, dict))
1702     response = objects.QueryResponse.FromDict(result)
1703     self.assertEqual(response.data,
1704                      [[(constants.RS_NORMAL, name),
1705                        (constants.RS_NORMAL, None),
1706                        (constants.RS_NORMAL, None),
1707                        (constants.RS_NORMAL, [])]
1708                       for name in utils.NiceSort(expnames)])
1709     self.assertEqual(len(response.fields), 4)
1710     self.assertEqual(["name", "mode", "owner", "pending"],
1711                      [fdef.name for fdef in response.fields])
1712
1713     # Test exclusive acquire
1714     for tlock in locks[::4]:
1715       tlock.acquire(shared=0)
1716       try:
1717         def _GetExpResult(name):
1718           if tlock.name == name:
1719             return [(constants.RS_NORMAL, name),
1720                     (constants.RS_NORMAL, "exclusive"),
1721                     (constants.RS_NORMAL,
1722                      [threading.currentThread().getName()]),
1723                     (constants.RS_NORMAL, [])]
1724           return [(constants.RS_NORMAL, name),
1725                   (constants.RS_NORMAL, None),
1726                   (constants.RS_NORMAL, None),
1727                   (constants.RS_NORMAL, [])]
1728
1729         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1730         self.assertEqual(objects.QueryResponse.FromDict(result).data,
1731                          [_GetExpResult(name)
1732                           for name in utils.NiceSort(expnames)])
1733       finally:
1734         tlock.release()
1735
1736     # Test shared acquire
1737     def _Acquire(lock, shared, ev, notify):
1738       lock.acquire(shared=shared)
1739       try:
1740         notify.set()
1741         ev.wait()
1742       finally:
1743         lock.release()
1744
1745     for tlock1 in locks[::11]:
1746       for tlock2 in locks[::-15]:
1747         if tlock2 == tlock1:
1748           # Avoid deadlocks
1749           continue
1750
1751         for tlock3 in locks[::10]:
1752           if tlock3 in (tlock2, tlock1):
1753             # Avoid deadlocks
1754             continue
1755
1756           releaseev = threading.Event()
1757
1758           # Acquire locks
1759           acquireev = []
1760           tthreads1 = []
1761           for i in range(3):
1762             ev = threading.Event()
1763             tthreads1.append(self._addThread(target=_Acquire,
1764                                              args=(tlock1, 1, releaseev, ev)))
1765             acquireev.append(ev)
1766
1767           ev = threading.Event()
1768           tthread2 = self._addThread(target=_Acquire,
1769                                      args=(tlock2, 1, releaseev, ev))
1770           acquireev.append(ev)
1771
1772           ev = threading.Event()
1773           tthread3 = self._addThread(target=_Acquire,
1774                                      args=(tlock3, 0, releaseev, ev))
1775           acquireev.append(ev)
1776
1777           # Wait for all locks to be acquired
1778           for i in acquireev:
1779             i.wait()
1780
1781           # Check query result
1782           result = self.lm.QueryLocks(["name", "mode", "owner"])
1783           response = objects.QueryResponse.FromDict(result)
1784           for (name, mode, owner) in response.data:
1785             (name_status, name_value) = name
1786             (owner_status, owner_value) = owner
1787
1788             self.assertEqual(name_status, constants.RS_NORMAL)
1789             self.assertEqual(owner_status, constants.RS_NORMAL)
1790
1791             if name_value == tlock1.name:
1792               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1793               self.assertEqual(set(owner_value),
1794                                set(i.getName() for i in tthreads1))
1795               continue
1796
1797             if name_value == tlock2.name:
1798               self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1799               self.assertEqual(owner_value, [tthread2.getName()])
1800               continue
1801
1802             if name_value == tlock3.name:
1803               self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1804               self.assertEqual(owner_value, [tthread3.getName()])
1805               continue
1806
1807             self.assert_(name_value in expnames)
1808             self.assertEqual(mode, (constants.RS_NORMAL, None))
1809             self.assert_(owner_value is None)
1810
1811           # Release locks again
1812           releaseev.set()
1813
1814           self._waitThreads()
1815
1816           result = self.lm.QueryLocks(["name", "mode", "owner"])
1817           self.assertEqual(objects.QueryResponse.FromDict(result).data,
1818                            [[(constants.RS_NORMAL, name),
1819                              (constants.RS_NORMAL, None),
1820                              (constants.RS_NORMAL, None)]
1821                             for name in utils.NiceSort(expnames)])
1822
1823   def testDelete(self):
1824     lock = locking.SharedLock("TestLock", monitor=self.lm)
1825
1826     self.assertEqual(len(self.lm._locks), 1)
1827     result = self.lm.QueryLocks(["name", "mode", "owner"])
1828     self.assertEqual(objects.QueryResponse.FromDict(result).data,
1829                      [[(constants.RS_NORMAL, lock.name),
1830                        (constants.RS_NORMAL, None),
1831                        (constants.RS_NORMAL, None)]])
1832
1833     lock.delete()
1834
1835     result = self.lm.QueryLocks(["name", "mode", "owner"])
1836     self.assertEqual(objects.QueryResponse.FromDict(result).data,
1837                      [[(constants.RS_NORMAL, lock.name),
1838                        (constants.RS_NORMAL, "deleted"),
1839                        (constants.RS_NORMAL, None)]])
1840     self.assertEqual(len(self.lm._locks), 1)
1841
1842   def testPending(self):
1843     def _Acquire(lock, shared, prev, next):
1844       prev.wait()
1845
1846       lock.acquire(shared=shared, test_notify=next.set)
1847       try:
1848         pass
1849       finally:
1850         lock.release()
1851
1852     lock = locking.SharedLock("ExcLock", monitor=self.lm)
1853
1854     for shared in [0, 1]:
1855       lock.acquire()
1856       try:
1857         self.assertEqual(len(self.lm._locks), 1)
1858         result = self.lm.QueryLocks(["name", "mode", "owner"])
1859         self.assertEqual(objects.QueryResponse.FromDict(result).data,
1860                          [[(constants.RS_NORMAL, lock.name),
1861                            (constants.RS_NORMAL, "exclusive"),
1862                            (constants.RS_NORMAL,
1863                             [threading.currentThread().getName()])]])
1864
1865         threads = []
1866
1867         first = threading.Event()
1868         prev = first
1869
1870         for i in range(5):
1871           ev = threading.Event()
1872           threads.append(self._addThread(target=_Acquire,
1873                                           args=(lock, shared, prev, ev)))
1874           prev = ev
1875
1876         # Start acquires
1877         first.set()
1878
1879         # Wait for last acquire to start waiting
1880         prev.wait()
1881
1882         # NOTE: This works only because QueryLocks will acquire the
1883         # lock-internal lock again and won't be able to get the information
1884         # until it has the lock. By then the acquire should be registered in
1885         # SharedLock.__pending (otherwise it's a bug).
1886
1887         # All acquires are waiting now
1888         if shared:
1889           pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
1890         else:
1891           pending = [("exclusive", [t.getName()]) for t in threads]
1892
1893         result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1894         self.assertEqual(objects.QueryResponse.FromDict(result).data,
1895                          [[(constants.RS_NORMAL, lock.name),
1896                            (constants.RS_NORMAL, "exclusive"),
1897                            (constants.RS_NORMAL,
1898                             [threading.currentThread().getName()]),
1899                            (constants.RS_NORMAL, pending)]])
1900
1901         self.assertEqual(len(self.lm._locks), 1)
1902       finally:
1903         lock.release()
1904
1905       self._waitThreads()
1906
1907       # No pending acquires
1908       result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1909       self.assertEqual(objects.QueryResponse.FromDict(result).data,
1910                        [[(constants.RS_NORMAL, lock.name),
1911                          (constants.RS_NORMAL, None),
1912                          (constants.RS_NORMAL, None),
1913                          (constants.RS_NORMAL, [])]])
1914
1915       self.assertEqual(len(self.lm._locks), 1)
1916
1917
1918 if __name__ == '__main__':
1919   testutils.GanetiTestProgram()