Convert the locking unittests to repetition-test
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29
30 from ganeti import locking
31 from ganeti import errors
32 from threading import Thread
33
34
35 # This is used to test the ssynchronize decorator.
36 # Since it's passed as input to a decorator it must be declared as a global.
37 _decoratorlock = locking.SharedLock()
38
39 #: List for looping tests
40 ITERATIONS = range(8)
41
42 def _Repeat(fn):
43   """Decorator for executing a function many times"""
44   def wrapper(*args, **kwargs):
45     for i in ITERATIONS:
46       fn(*args, **kwargs)
47   return wrapper
48
49 class _ThreadedTestCase(unittest.TestCase):
50   """Test class that supports adding/waiting on threads"""
51   def setUp(self):
52     unittest.TestCase.setUp(self)
53     self.threads = []
54
55   def _addThread(self, *args, **kwargs):
56     """Create and remember a new thread"""
57     t = Thread(*args, **kwargs)
58     self.threads.append(t)
59     t.start()
60     return t
61
62   def _waitThreads(self):
63     """Wait for all our threads to finish"""
64     for t in self.threads:
65       t.join(60)
66       self.failIf(t.isAlive())
67     self.threads = []
68
69
70 class TestSharedLock(_ThreadedTestCase):
71   """SharedLock tests"""
72
73   def setUp(self):
74     _ThreadedTestCase.setUp(self)
75     self.sl = locking.SharedLock()
76     # helper threads use the 'done' queue to tell the master they finished.
77     self.done = Queue.Queue(0)
78
79   def testSequenceAndOwnership(self):
80     self.assert_(not self.sl._is_owned())
81     self.sl.acquire(shared=1)
82     self.assert_(self.sl._is_owned())
83     self.assert_(self.sl._is_owned(shared=1))
84     self.assert_(not self.sl._is_owned(shared=0))
85     self.sl.release()
86     self.assert_(not self.sl._is_owned())
87     self.sl.acquire()
88     self.assert_(self.sl._is_owned())
89     self.assert_(not self.sl._is_owned(shared=1))
90     self.assert_(self.sl._is_owned(shared=0))
91     self.sl.release()
92     self.assert_(not self.sl._is_owned())
93     self.sl.acquire(shared=1)
94     self.assert_(self.sl._is_owned())
95     self.assert_(self.sl._is_owned(shared=1))
96     self.assert_(not self.sl._is_owned(shared=0))
97     self.sl.release()
98     self.assert_(not self.sl._is_owned())
99
100   def testBooleanValue(self):
101     # semaphores are supposed to return a true value on a successful acquire
102     self.assert_(self.sl.acquire(shared=1))
103     self.sl.release()
104     self.assert_(self.sl.acquire())
105     self.sl.release()
106
107   def testDoubleLockingStoE(self):
108     self.sl.acquire(shared=1)
109     self.assertRaises(AssertionError, self.sl.acquire)
110
111   def testDoubleLockingEtoS(self):
112     self.sl.acquire()
113     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
114
115   def testDoubleLockingStoS(self):
116     self.sl.acquire(shared=1)
117     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
118
119   def testDoubleLockingEtoE(self):
120     self.sl.acquire()
121     self.assertRaises(AssertionError, self.sl.acquire)
122
123   # helper functions: called in a separate thread they acquire the lock, send
124   # their identifier on the done queue, then release it.
125   def _doItSharer(self):
126     try:
127       self.sl.acquire(shared=1)
128       self.done.put('SHR')
129       self.sl.release()
130     except errors.LockError:
131       self.done.put('ERR')
132
133   def _doItExclusive(self):
134     try:
135       self.sl.acquire()
136       self.done.put('EXC')
137       self.sl.release()
138     except errors.LockError:
139       self.done.put('ERR')
140
141   def _doItDelete(self):
142     try:
143       self.sl.delete()
144       self.done.put('DEL')
145     except errors.LockError:
146       self.done.put('ERR')
147
148   def testSharersCanCoexist(self):
149     self.sl.acquire(shared=1)
150     Thread(target=self._doItSharer).start()
151     self.assert_(self.done.get(True, 1))
152     self.sl.release()
153
154   @_Repeat
155   def testExclusiveBlocksExclusive(self):
156     self.sl.acquire()
157     self._addThread(target=self._doItExclusive)
158     self.assertRaises(Queue.Empty, self.done.get_nowait)
159     self.sl.release()
160     self._waitThreads()
161     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
162
163   @_Repeat
164   def testExclusiveBlocksDelete(self):
165     self.sl.acquire()
166     self._addThread(target=self._doItDelete)
167     self.assertRaises(Queue.Empty, self.done.get_nowait)
168     self.sl.release()
169     self._waitThreads()
170     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
171     self.sl = locking.SharedLock()
172
173   @_Repeat
174   def testExclusiveBlocksSharer(self):
175     self.sl.acquire()
176     self._addThread(target=self._doItSharer)
177     self.assertRaises(Queue.Empty, self.done.get_nowait)
178     self.sl.release()
179     self._waitThreads()
180     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
181
182   @_Repeat
183   def testSharerBlocksExclusive(self):
184     self.sl.acquire(shared=1)
185     self._addThread(target=self._doItExclusive)
186     self.assertRaises(Queue.Empty, self.done.get_nowait)
187     self.sl.release()
188     self._waitThreads()
189     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
190
191   @_Repeat
192   def testSharerBlocksDelete(self):
193     self.sl.acquire(shared=1)
194     self._addThread(target=self._doItDelete)
195     self.assertRaises(Queue.Empty, self.done.get_nowait)
196     self.sl.release()
197     self._waitThreads()
198     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
199     self.sl = locking.SharedLock()
200
201   @_Repeat
202   def testWaitingExclusiveBlocksSharer(self):
203     self.sl.acquire(shared=1)
204     # the lock is acquired in shared mode...
205     self._addThread(target=self._doItExclusive)
206     # ...but now an exclusive is waiting...
207     self._addThread(target=self._doItSharer)
208     # ...so the sharer should be blocked as well
209     self.assertRaises(Queue.Empty, self.done.get_nowait)
210     self.sl.release()
211     self._waitThreads()
212     # The exclusive passed before
213     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
214     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
215
216   @_Repeat
217   def testWaitingSharerBlocksExclusive(self):
218     self.sl.acquire()
219     # the lock is acquired in exclusive mode...
220     self._addThread(target=self._doItSharer)
221     # ...but now a sharer is waiting...
222     self._addThread(target=self._doItExclusive)
223     # ...the exclusive is waiting too...
224     self.assertRaises(Queue.Empty, self.done.get_nowait)
225     self.sl.release()
226     self._waitThreads()
227     # The sharer passed before
228     self.assertEqual(self.done.get_nowait(), 'SHR')
229     self.assertEqual(self.done.get_nowait(), 'EXC')
230
231   def testNoNonBlocking(self):
232     self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
233     self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
234     self.sl.acquire()
235     self.sl.delete(blocking=0) # Fine, because the lock is already acquired
236
237   def testDelete(self):
238     self.sl.delete()
239     self.assertRaises(errors.LockError, self.sl.acquire)
240     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
241     self.assertRaises(errors.LockError, self.sl.delete)
242
243   def testNoDeleteIfSharer(self):
244     self.sl.acquire(shared=1)
245     self.assertRaises(AssertionError, self.sl.delete)
246
247   @_Repeat
248   def testDeletePendingSharersExclusiveDelete(self):
249     self.sl.acquire()
250     self._addThread(target=self._doItSharer)
251     self._addThread(target=self._doItSharer)
252     self._addThread(target=self._doItExclusive)
253     self._addThread(target=self._doItDelete)
254     self.sl.delete()
255     self._waitThreads()
256     # The threads who were pending return ERR
257     for _ in range(4):
258       self.assertEqual(self.done.get_nowait(), 'ERR')
259     self.sl = locking.SharedLock()
260
261   @_Repeat
262   def testDeletePendingDeleteExclusiveSharers(self):
263     self.sl.acquire()
264     self._addThread(target=self._doItDelete)
265     self._addThread(target=self._doItExclusive)
266     self._addThread(target=self._doItSharer)
267     self._addThread(target=self._doItSharer)
268     self.sl.delete()
269     self._waitThreads()
270     # The two threads who were pending return both ERR
271     self.assertEqual(self.done.get_nowait(), 'ERR')
272     self.assertEqual(self.done.get_nowait(), 'ERR')
273     self.assertEqual(self.done.get_nowait(), 'ERR')
274     self.assertEqual(self.done.get_nowait(), 'ERR')
275     self.sl = locking.SharedLock()
276
277
278 class TestSSynchronizedDecorator(_ThreadedTestCase):
279   """Shared Lock Synchronized decorator test"""
280
281   def setUp(self):
282     _ThreadedTestCase.setUp(self)
283     # helper threads use the 'done' queue to tell the master they finished.
284     self.done = Queue.Queue(0)
285
286   @locking.ssynchronized(_decoratorlock)
287   def _doItExclusive(self):
288     self.assert_(_decoratorlock._is_owned())
289     self.done.put('EXC')
290
291   @locking.ssynchronized(_decoratorlock, shared=1)
292   def _doItSharer(self):
293     self.assert_(_decoratorlock._is_owned(shared=1))
294     self.done.put('SHR')
295
296   def testDecoratedFunctions(self):
297     self._doItExclusive()
298     self.assert_(not _decoratorlock._is_owned())
299     self._doItSharer()
300     self.assert_(not _decoratorlock._is_owned())
301
302   def testSharersCanCoexist(self):
303     _decoratorlock.acquire(shared=1)
304     Thread(target=self._doItSharer).start()
305     self.assert_(self.done.get(True, 1))
306     _decoratorlock.release()
307
308   @_Repeat
309   def testExclusiveBlocksExclusive(self):
310     _decoratorlock.acquire()
311     self._addThread(target=self._doItExclusive)
312     # give it a bit of time to check that it's not actually doing anything
313     self.assertRaises(Queue.Empty, self.done.get_nowait)
314     _decoratorlock.release()
315     self._waitThreads()
316     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
317
318   @_Repeat
319   def testExclusiveBlocksSharer(self):
320     _decoratorlock.acquire()
321     self._addThread(target=self._doItSharer)
322     self.assertRaises(Queue.Empty, self.done.get_nowait)
323     _decoratorlock.release()
324     self._waitThreads()
325     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
326
327   @_Repeat
328   def testSharerBlocksExclusive(self):
329     _decoratorlock.acquire(shared=1)
330     self._addThread(target=self._doItExclusive)
331     self.assertRaises(Queue.Empty, self.done.get_nowait)
332     _decoratorlock.release()
333     self._waitThreads()
334     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
335
336
337 class TestLockSet(_ThreadedTestCase):
338   """LockSet tests"""
339
340   def setUp(self):
341     _ThreadedTestCase.setUp(self)
342     self._setUpLS()
343     # helper threads use the 'done' queue to tell the master they finished.
344     self.done = Queue.Queue(0)
345
346   def _setUpLS(self):
347     """Helper to (re)initialize the lock set"""
348     self.resources = ['one', 'two', 'three']
349     self.ls = locking.LockSet(members=self.resources)
350
351
352   def testResources(self):
353     self.assertEquals(self.ls._names(), set(self.resources))
354     newls = locking.LockSet()
355     self.assertEquals(newls._names(), set())
356
357   def testAcquireRelease(self):
358     self.assert_(self.ls.acquire('one'))
359     self.assertEquals(self.ls._list_owned(), set(['one']))
360     self.ls.release()
361     self.assertEquals(self.ls._list_owned(), set())
362     self.assertEquals(self.ls.acquire(['one']), set(['one']))
363     self.assertEquals(self.ls._list_owned(), set(['one']))
364     self.ls.release()
365     self.assertEquals(self.ls._list_owned(), set())
366     self.ls.acquire(['one', 'two', 'three'])
367     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
368     self.ls.release('one')
369     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
370     self.ls.release(['three'])
371     self.assertEquals(self.ls._list_owned(), set(['two']))
372     self.ls.release()
373     self.assertEquals(self.ls._list_owned(), set())
374     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
375     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
376     self.ls.release()
377     self.assertEquals(self.ls._list_owned(), set())
378
379   def testNoDoubleAcquire(self):
380     self.ls.acquire('one')
381     self.assertRaises(AssertionError, self.ls.acquire, 'one')
382     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
383     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
384     self.ls.release()
385     self.ls.acquire(['one', 'three'])
386     self.ls.release('one')
387     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
388     self.ls.release('three')
389
390   def testNoWrongRelease(self):
391     self.assertRaises(AssertionError, self.ls.release)
392     self.ls.acquire('one')
393     self.assertRaises(AssertionError, self.ls.release, 'two')
394
395   def testAddRemove(self):
396     self.ls.add('four')
397     self.assertEquals(self.ls._list_owned(), set())
398     self.assert_('four' in self.ls._names())
399     self.ls.add(['five', 'six', 'seven'], acquired=1)
400     self.assert_('five' in self.ls._names())
401     self.assert_('six' in self.ls._names())
402     self.assert_('seven' in self.ls._names())
403     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
404     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
405     self.assert_('five' not in self.ls._names())
406     self.assert_('six' not in self.ls._names())
407     self.assertEquals(self.ls._list_owned(), set(['seven']))
408     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
409     self.ls.remove('seven')
410     self.assert_('seven' not in self.ls._names())
411     self.assertEquals(self.ls._list_owned(), set([]))
412     self.ls.acquire(None, shared=1)
413     self.assertRaises(AssertionError, self.ls.add, 'eight')
414     self.ls.release()
415     self.ls.acquire(None)
416     self.ls.add('eight', acquired=1)
417     self.assert_('eight' in self.ls._names())
418     self.assert_('eight' in self.ls._list_owned())
419     self.ls.add('nine')
420     self.assert_('nine' in self.ls._names())
421     self.assert_('nine' not in self.ls._list_owned())
422     self.ls.release()
423     self.ls.remove(['two'])
424     self.assert_('two' not in self.ls._names())
425     self.ls.acquire('three')
426     self.assertEquals(self.ls.remove(['three']), ['three'])
427     self.assert_('three' not in self.ls._names())
428     self.assertEquals(self.ls.remove('three'), [])
429     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
430     self.assert_('one' not in self.ls._names())
431
432   def testRemoveNonBlocking(self):
433     self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
434     self.ls.acquire('one')
435     self.assertEquals(self.ls.remove('one', blocking=0), ['one'])
436     self.ls.acquire(['two', 'three'])
437     self.assertEquals(self.ls.remove(['two', 'three'], blocking=0),
438                       ['two', 'three'])
439
440   def testNoDoubleAdd(self):
441     self.assertRaises(errors.LockError, self.ls.add, 'two')
442     self.ls.add('four')
443     self.assertRaises(errors.LockError, self.ls.add, 'four')
444
445   def testNoWrongRemoves(self):
446     self.ls.acquire(['one', 'three'], shared=1)
447     # Cannot remove 'two' while holding something which is not a superset
448     self.assertRaises(AssertionError, self.ls.remove, 'two')
449     # Cannot remove 'three' as we are sharing it
450     self.assertRaises(AssertionError, self.ls.remove, 'three')
451
452   def testAcquireSetLock(self):
453     # acquire the set-lock exclusively
454     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
455     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
456     self.assertEquals(self.ls._is_owned(), True)
457     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
458     # I can still add/remove elements...
459     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
460     self.assert_(self.ls.add('six'))
461     self.ls.release()
462     # share the set-lock
463     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
464     # adding new elements is not possible
465     self.assertRaises(AssertionError, self.ls.add, 'five')
466     self.ls.release()
467
468   def testAcquireWithRepetitions(self):
469     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
470                       set(['two', 'two', 'three']))
471     self.ls.release(['two', 'two'])
472     self.assertEquals(self.ls._list_owned(), set(['three']))
473
474   def testEmptyAcquire(self):
475     # Acquire an empty list of locks...
476     self.assertEquals(self.ls.acquire([]), set())
477     self.assertEquals(self.ls._list_owned(), set())
478     # New locks can still be addded
479     self.assert_(self.ls.add('six'))
480     # "re-acquiring" is not an issue, since we had really acquired nothing
481     self.assertEquals(self.ls.acquire([], shared=1), set())
482     self.assertEquals(self.ls._list_owned(), set())
483     # We haven't really acquired anything, so we cannot release
484     self.assertRaises(AssertionError, self.ls.release)
485
486   def _doLockSet(self, set, shared):
487     try:
488       self.ls.acquire(set, shared=shared)
489       self.done.put('DONE')
490       self.ls.release()
491     except errors.LockError:
492       self.done.put('ERR')
493
494   def _doAddSet(self, set):
495     try:
496       self.ls.add(set, acquired=1)
497       self.done.put('DONE')
498       self.ls.release()
499     except errors.LockError:
500       self.done.put('ERR')
501
502   def _doRemoveSet(self, set):
503     self.done.put(self.ls.remove(set))
504
505   @_Repeat
506   def testConcurrentSharedAcquire(self):
507     self.ls.acquire(['one', 'two'], shared=1)
508     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
509     self._waitThreads()
510     self.assertEqual(self.done.get_nowait(), 'DONE')
511     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
512     self._waitThreads()
513     self.assertEqual(self.done.get_nowait(), 'DONE')
514     self._addThread(target=self._doLockSet, args=('three', 1))
515     self._waitThreads()
516     self.assertEqual(self.done.get_nowait(), 'DONE')
517     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
518     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
519     self.assertRaises(Queue.Empty, self.done.get_nowait)
520     self.ls.release()
521     self._waitThreads()
522     self.assertEqual(self.done.get_nowait(), 'DONE')
523     self.assertEqual(self.done.get_nowait(), 'DONE')
524
525   @_Repeat
526   def testConcurrentExclusiveAcquire(self):
527     self.ls.acquire(['one', 'two'])
528     self._addThread(target=self._doLockSet, args=('three', 1))
529     self._waitThreads()
530     self.assertEqual(self.done.get_nowait(), 'DONE')
531     self._addThread(target=self._doLockSet, args=('three', 0))
532     self._waitThreads()
533     self.assertEqual(self.done.get_nowait(), 'DONE')
534     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
535     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
536     self._addThread(target=self._doLockSet, args=('one', 0))
537     self._addThread(target=self._doLockSet, args=('one', 1))
538     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
539     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
540     self.assertRaises(Queue.Empty, self.done.get_nowait)
541     self.ls.release()
542     self._waitThreads()
543     for _ in range(6):
544       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
545
546   @_Repeat
547   def testConcurrentRemove(self):
548     self.ls.add('four')
549     self.ls.acquire(['one', 'two', 'four'])
550     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
551     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
552     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
553     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
554     self.assertRaises(Queue.Empty, self.done.get_nowait)
555     self.ls.remove('one')
556     self.ls.release()
557     self._waitThreads()
558     for i in range(4):
559       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
560     self.ls.add(['five', 'six'], acquired=1)
561     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
562     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
563     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
564     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
565     self.ls.remove('five')
566     self.ls.release()
567     self._waitThreads()
568     for i in range(4):
569       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
570     self.ls.acquire(['three', 'four'])
571     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
572     self.assertRaises(Queue.Empty, self.done.get_nowait)
573     self.ls.remove('four')
574     self._waitThreads()
575     self.assertEqual(self.done.get_nowait(), ['six'])
576     self._addThread(target=self._doRemoveSet, args=(['two']))
577     self._waitThreads()
578     self.assertEqual(self.done.get_nowait(), ['two'])
579     self.ls.release()
580     # reset lockset
581     self._setUpLS()
582
583   @_Repeat
584   def testConcurrentSharedSetLock(self):
585     # share the set-lock...
586     self.ls.acquire(None, shared=1)
587     # ...another thread can share it too
588     self._addThread(target=self._doLockSet, args=(None, 1))
589     self._waitThreads()
590     self.assertEqual(self.done.get_nowait(), 'DONE')
591     # ...or just share some elements
592     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
593     self._waitThreads()
594     self.assertEqual(self.done.get_nowait(), 'DONE')
595     # ...but not add new ones or remove any
596     t = self._addThread(target=self._doAddSet, args=(['nine']))
597     self._addThread(target=self._doRemoveSet, args=(['two'], ))
598     self.assertRaises(Queue.Empty, self.done.get_nowait)
599     # this just releases the set-lock
600     self.ls.release([])
601     t.join(60)
602     self.assertEqual(self.done.get_nowait(), 'DONE')
603     # release the lock on the actual elements so remove() can proceed too
604     self.ls.release()
605     self._waitThreads()
606     self.failUnlessEqual(self.done.get_nowait(), ['two'])
607     # reset lockset
608     self._setUpLS()
609
610   @_Repeat
611   def testConcurrentExclusiveSetLock(self):
612     # acquire the set-lock...
613     self.ls.acquire(None, shared=0)
614     # ...no one can do anything else
615     self._addThread(target=self._doLockSet, args=(None, 1))
616     self._addThread(target=self._doLockSet, args=(None, 0))
617     self._addThread(target=self._doLockSet, args=(['three'], 0))
618     self._addThread(target=self._doLockSet, args=(['two'], 1))
619     self._addThread(target=self._doAddSet, args=(['nine']))
620     self.assertRaises(Queue.Empty, self.done.get_nowait)
621     self.ls.release()
622     self._waitThreads()
623     for _ in range(5):
624       self.assertEqual(self.done.get(True, 1), 'DONE')
625     # cleanup
626     self._setUpLS()
627
628   @_Repeat
629   def testConcurrentSetLockAdd(self):
630     self.ls.acquire('one')
631     # Another thread wants the whole SetLock
632     self._addThread(target=self._doLockSet, args=(None, 0))
633     self._addThread(target=self._doLockSet, args=(None, 1))
634     self.assertRaises(Queue.Empty, self.done.get_nowait)
635     self.assertRaises(AssertionError, self.ls.add, 'four')
636     self.ls.release()
637     self._waitThreads()
638     self.assertEqual(self.done.get_nowait(), 'DONE')
639     self.assertEqual(self.done.get_nowait(), 'DONE')
640     self.ls.acquire(None)
641     self._addThread(target=self._doLockSet, args=(None, 0))
642     self._addThread(target=self._doLockSet, args=(None, 1))
643     self.assertRaises(Queue.Empty, self.done.get_nowait)
644     self.ls.add('four')
645     self.ls.add('five', acquired=1)
646     self.ls.add('six', acquired=1, shared=1)
647     self.assertEquals(self.ls._list_owned(),
648       set(['one', 'two', 'three', 'five', 'six']))
649     self.assertEquals(self.ls._is_owned(), True)
650     self.assertEquals(self.ls._names(),
651       set(['one', 'two', 'three', 'four', 'five', 'six']))
652     self.ls.release()
653     self._waitThreads()
654     self.assertEqual(self.done.get_nowait(), 'DONE')
655     self.assertEqual(self.done.get_nowait(), 'DONE')
656     self._setUpLS()
657
658   @_Repeat
659   def testEmptyLockSet(self):
660     # get the set-lock
661     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
662     # now empty it...
663     self.ls.remove(['one', 'two', 'three'])
664     # and adds/locks by another thread still wait
665     self._addThread(target=self._doAddSet, args=(['nine']))
666     self._addThread(target=self._doLockSet, args=(None, 1))
667     self._addThread(target=self._doLockSet, args=(None, 0))
668     self.assertRaises(Queue.Empty, self.done.get_nowait)
669     self.ls.release()
670     self._waitThreads()
671     for _ in range(3):
672       self.assertEqual(self.done.get_nowait(), 'DONE')
673     # empty it again...
674     self.assertEqual(self.ls.remove(['nine']), ['nine'])
675     # now share it...
676     self.assertEqual(self.ls.acquire(None, shared=1), set())
677     # other sharers can go, adds still wait
678     self._addThread(target=self._doLockSet, args=(None, 1))
679     self._waitThreads()
680     self.assertEqual(self.done.get_nowait(), 'DONE')
681     self._addThread(target=self._doAddSet, args=(['nine']))
682     self.assertRaises(Queue.Empty, self.done.get_nowait)
683     self.ls.release()
684     self._waitThreads()
685     self.assertEqual(self.done.get_nowait(), 'DONE')
686     self._setUpLS()
687
688
689 class TestGanetiLockManager(_ThreadedTestCase):
690
691   def setUp(self):
692     _ThreadedTestCase.setUp(self)
693     self.nodes=['n1', 'n2']
694     self.instances=['i1', 'i2', 'i3']
695     self.GL = locking.GanetiLockManager(nodes=self.nodes,
696                                         instances=self.instances)
697     self.done = Queue.Queue(0)
698
699   def tearDown(self):
700     # Don't try this at home...
701     locking.GanetiLockManager._instance = None
702
703   def testLockingConstants(self):
704     # The locking library internally cheats by assuming its constants have some
705     # relationships with each other. Check those hold true.
706     # This relationship is also used in the Processor to recursively acquire
707     # the right locks. Again, please don't break it.
708     for i in range(len(locking.LEVELS)):
709       self.assertEqual(i, locking.LEVELS[i])
710
711   def testDoubleGLFails(self):
712     self.assertRaises(AssertionError, locking.GanetiLockManager)
713
714   def testLockNames(self):
715     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
716     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
717     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
718                      set(self.instances))
719
720   def testInitAndResources(self):
721     locking.GanetiLockManager._instance = None
722     self.GL = locking.GanetiLockManager()
723     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
724     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
725     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
726
727     locking.GanetiLockManager._instance = None
728     self.GL = locking.GanetiLockManager(nodes=self.nodes)
729     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
730     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
731     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
732
733     locking.GanetiLockManager._instance = None
734     self.GL = locking.GanetiLockManager(instances=self.instances)
735     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
736     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
737     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
738                      set(self.instances))
739
740   def testAcquireRelease(self):
741     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
742     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
743     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
744     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
745     self.GL.release(locking.LEVEL_NODE, ['n2'])
746     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
747     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
748     self.GL.release(locking.LEVEL_NODE)
749     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
750     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
751     self.GL.release(locking.LEVEL_INSTANCE)
752     self.assertRaises(errors.LockError, self.GL.acquire,
753                       locking.LEVEL_INSTANCE, ['i5'])
754     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
755     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
756
757   def testAcquireWholeSets(self):
758     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
759     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
760                       set(self.instances))
761     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
762                       set(self.instances))
763     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
764                       set(self.nodes))
765     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
766                       set(self.nodes))
767     self.GL.release(locking.LEVEL_NODE)
768     self.GL.release(locking.LEVEL_INSTANCE)
769     self.GL.release(locking.LEVEL_CLUSTER)
770
771   def testAcquireWholeAndPartial(self):
772     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
773     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
774                       set(self.instances))
775     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
776                       set(self.instances))
777     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
778                       set(['n2']))
779     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
780                       set(['n2']))
781     self.GL.release(locking.LEVEL_NODE)
782     self.GL.release(locking.LEVEL_INSTANCE)
783     self.GL.release(locking.LEVEL_CLUSTER)
784
785   def testBGLDependency(self):
786     self.assertRaises(AssertionError, self.GL.acquire,
787                       locking.LEVEL_NODE, ['n1', 'n2'])
788     self.assertRaises(AssertionError, self.GL.acquire,
789                       locking.LEVEL_INSTANCE, ['i3'])
790     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
791     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
792     self.assertRaises(AssertionError, self.GL.release,
793                       locking.LEVEL_CLUSTER, ['BGL'])
794     self.assertRaises(AssertionError, self.GL.release,
795                       locking.LEVEL_CLUSTER)
796     self.GL.release(locking.LEVEL_NODE)
797     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
798     self.assertRaises(AssertionError, self.GL.release,
799                       locking.LEVEL_CLUSTER, ['BGL'])
800     self.assertRaises(AssertionError, self.GL.release,
801                       locking.LEVEL_CLUSTER)
802     self.GL.release(locking.LEVEL_INSTANCE)
803
804   def testWrongOrder(self):
805     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
806     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
807     self.assertRaises(AssertionError, self.GL.acquire,
808                       locking.LEVEL_NODE, ['n1'])
809     self.assertRaises(AssertionError, self.GL.acquire,
810                       locking.LEVEL_INSTANCE, ['i2'])
811
812   # Helper function to run as a thread that shared the BGL and then acquires
813   # some locks at another level.
814   def _doLock(self, level, names, shared):
815     try:
816       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
817       self.GL.acquire(level, names, shared=shared)
818       self.done.put('DONE')
819       self.GL.release(level)
820       self.GL.release(locking.LEVEL_CLUSTER)
821     except errors.LockError:
822       self.done.put('ERR')
823
824   @_Repeat
825   def testConcurrency(self):
826     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
827     self._addThread(target=self._doLock,
828                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
829     self._waitThreads()
830     self.assertEqual(self.done.get_nowait(), 'DONE')
831     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
832     self._addThread(target=self._doLock,
833                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
834     self._waitThreads()
835     self.assertEqual(self.done.get_nowait(), 'DONE')
836     self._addThread(target=self._doLock,
837                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
838     self.assertRaises(Queue.Empty, self.done.get_nowait)
839     self.GL.release(locking.LEVEL_INSTANCE)
840     self._waitThreads()
841     self.assertEqual(self.done.get_nowait(), 'DONE')
842     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
843     self._addThread(target=self._doLock,
844                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
845     self._waitThreads()
846     self.assertEqual(self.done.get_nowait(), 'DONE')
847     self._addThread(target=self._doLock,
848                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
849     self.assertRaises(Queue.Empty, self.done.get_nowait)
850     self.GL.release(locking.LEVEL_INSTANCE)
851     self._waitThreads()
852     self.assertEqual(self.done.get(True, 1), 'DONE')
853     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
854
855
856 if __name__ == '__main__':
857   unittest.main()
858   #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
859   #unittest.TextTestRunner(verbosity=2).run(suite)