LUSetInstanceParams: nic parameters
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29
30 from ganeti import locking
31 from ganeti import errors
32 from threading import Thread
33
34
35 # This is used to test the ssynchronize decorator.
36 # Since it's passed as input to a decorator it must be declared as a global.
37 _decoratorlock = locking.SharedLock()
38
39 #: List for looping tests
40 ITERATIONS = range(8)
41
42 def _Repeat(fn):
43   """Decorator for executing a function many times"""
44   def wrapper(*args, **kwargs):
45     for i in ITERATIONS:
46       fn(*args, **kwargs)
47   return wrapper
48
49 class _ThreadedTestCase(unittest.TestCase):
50   """Test class that supports adding/waiting on threads"""
51   def setUp(self):
52     unittest.TestCase.setUp(self)
53     self.threads = []
54
55   def _addThread(self, *args, **kwargs):
56     """Create and remember a new thread"""
57     t = Thread(*args, **kwargs)
58     self.threads.append(t)
59     t.start()
60     return t
61
62   def _waitThreads(self):
63     """Wait for all our threads to finish"""
64     for t in self.threads:
65       t.join(60)
66       self.failIf(t.isAlive())
67     self.threads = []
68
69
70 class TestSharedLock(_ThreadedTestCase):
71   """SharedLock tests"""
72
73   def setUp(self):
74     _ThreadedTestCase.setUp(self)
75     self.sl = locking.SharedLock()
76     # helper threads use the 'done' queue to tell the master they finished.
77     self.done = Queue.Queue(0)
78
79   def testSequenceAndOwnership(self):
80     self.assert_(not self.sl._is_owned())
81     self.sl.acquire(shared=1)
82     self.assert_(self.sl._is_owned())
83     self.assert_(self.sl._is_owned(shared=1))
84     self.assert_(not self.sl._is_owned(shared=0))
85     self.sl.release()
86     self.assert_(not self.sl._is_owned())
87     self.sl.acquire()
88     self.assert_(self.sl._is_owned())
89     self.assert_(not self.sl._is_owned(shared=1))
90     self.assert_(self.sl._is_owned(shared=0))
91     self.sl.release()
92     self.assert_(not self.sl._is_owned())
93     self.sl.acquire(shared=1)
94     self.assert_(self.sl._is_owned())
95     self.assert_(self.sl._is_owned(shared=1))
96     self.assert_(not self.sl._is_owned(shared=0))
97     self.sl.release()
98     self.assert_(not self.sl._is_owned())
99
100   def testBooleanValue(self):
101     # semaphores are supposed to return a true value on a successful acquire
102     self.assert_(self.sl.acquire(shared=1))
103     self.sl.release()
104     self.assert_(self.sl.acquire())
105     self.sl.release()
106
107   def testDoubleLockingStoE(self):
108     self.sl.acquire(shared=1)
109     self.assertRaises(AssertionError, self.sl.acquire)
110
111   def testDoubleLockingEtoS(self):
112     self.sl.acquire()
113     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
114
115   def testDoubleLockingStoS(self):
116     self.sl.acquire(shared=1)
117     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
118
119   def testDoubleLockingEtoE(self):
120     self.sl.acquire()
121     self.assertRaises(AssertionError, self.sl.acquire)
122
123   # helper functions: called in a separate thread they acquire the lock, send
124   # their identifier on the done queue, then release it.
125   def _doItSharer(self):
126     try:
127       self.sl.acquire(shared=1)
128       self.done.put('SHR')
129       self.sl.release()
130     except errors.LockError:
131       self.done.put('ERR')
132
133   def _doItExclusive(self):
134     try:
135       self.sl.acquire()
136       self.done.put('EXC')
137       self.sl.release()
138     except errors.LockError:
139       self.done.put('ERR')
140
141   def _doItDelete(self):
142     try:
143       self.sl.delete()
144       self.done.put('DEL')
145     except errors.LockError:
146       self.done.put('ERR')
147
148   def testSharersCanCoexist(self):
149     self.sl.acquire(shared=1)
150     Thread(target=self._doItSharer).start()
151     self.assert_(self.done.get(True, 1))
152     self.sl.release()
153
154   @_Repeat
155   def testExclusiveBlocksExclusive(self):
156     self.sl.acquire()
157     self._addThread(target=self._doItExclusive)
158     self.assertRaises(Queue.Empty, self.done.get_nowait)
159     self.sl.release()
160     self._waitThreads()
161     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
162
163   @_Repeat
164   def testExclusiveBlocksDelete(self):
165     self.sl.acquire()
166     self._addThread(target=self._doItDelete)
167     self.assertRaises(Queue.Empty, self.done.get_nowait)
168     self.sl.release()
169     self._waitThreads()
170     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
171     self.sl = locking.SharedLock()
172
173   @_Repeat
174   def testExclusiveBlocksSharer(self):
175     self.sl.acquire()
176     self._addThread(target=self._doItSharer)
177     self.assertRaises(Queue.Empty, self.done.get_nowait)
178     self.sl.release()
179     self._waitThreads()
180     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
181
182   @_Repeat
183   def testSharerBlocksExclusive(self):
184     self.sl.acquire(shared=1)
185     self._addThread(target=self._doItExclusive)
186     self.assertRaises(Queue.Empty, self.done.get_nowait)
187     self.sl.release()
188     self._waitThreads()
189     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
190
191   @_Repeat
192   def testSharerBlocksDelete(self):
193     self.sl.acquire(shared=1)
194     self._addThread(target=self._doItDelete)
195     self.assertRaises(Queue.Empty, self.done.get_nowait)
196     self.sl.release()
197     self._waitThreads()
198     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
199     self.sl = locking.SharedLock()
200
201   @_Repeat
202   def testWaitingExclusiveBlocksSharer(self):
203     """SKIPPED testWaitingExclusiveBlockSharer"""
204     return
205
206     self.sl.acquire(shared=1)
207     # the lock is acquired in shared mode...
208     self._addThread(target=self._doItExclusive)
209     # ...but now an exclusive is waiting...
210     self._addThread(target=self._doItSharer)
211     # ...so the sharer should be blocked as well
212     self.assertRaises(Queue.Empty, self.done.get_nowait)
213     self.sl.release()
214     self._waitThreads()
215     # The exclusive passed before
216     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
217     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
218
219   @_Repeat
220   def testWaitingSharerBlocksExclusive(self):
221     """SKIPPED testWaitingSharerBlocksExclusive"""
222     return
223
224     self.sl.acquire()
225     # the lock is acquired in exclusive mode...
226     self._addThread(target=self._doItSharer)
227     # ...but now a sharer is waiting...
228     self._addThread(target=self._doItExclusive)
229     # ...the exclusive is waiting too...
230     self.assertRaises(Queue.Empty, self.done.get_nowait)
231     self.sl.release()
232     self._waitThreads()
233     # The sharer passed before
234     self.assertEqual(self.done.get_nowait(), 'SHR')
235     self.assertEqual(self.done.get_nowait(), 'EXC')
236
237   def testNoNonBlocking(self):
238     self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
239     self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
240     self.sl.acquire()
241     self.sl.delete(blocking=0) # Fine, because the lock is already acquired
242
243   def testDelete(self):
244     self.sl.delete()
245     self.assertRaises(errors.LockError, self.sl.acquire)
246     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
247     self.assertRaises(errors.LockError, self.sl.delete)
248
249   def testNoDeleteIfSharer(self):
250     self.sl.acquire(shared=1)
251     self.assertRaises(AssertionError, self.sl.delete)
252
253   @_Repeat
254   def testDeletePendingSharersExclusiveDelete(self):
255     self.sl.acquire()
256     self._addThread(target=self._doItSharer)
257     self._addThread(target=self._doItSharer)
258     self._addThread(target=self._doItExclusive)
259     self._addThread(target=self._doItDelete)
260     self.sl.delete()
261     self._waitThreads()
262     # The threads who were pending return ERR
263     for _ in range(4):
264       self.assertEqual(self.done.get_nowait(), 'ERR')
265     self.sl = locking.SharedLock()
266
267   @_Repeat
268   def testDeletePendingDeleteExclusiveSharers(self):
269     self.sl.acquire()
270     self._addThread(target=self._doItDelete)
271     self._addThread(target=self._doItExclusive)
272     self._addThread(target=self._doItSharer)
273     self._addThread(target=self._doItSharer)
274     self.sl.delete()
275     self._waitThreads()
276     # The two threads who were pending return both ERR
277     self.assertEqual(self.done.get_nowait(), 'ERR')
278     self.assertEqual(self.done.get_nowait(), 'ERR')
279     self.assertEqual(self.done.get_nowait(), 'ERR')
280     self.assertEqual(self.done.get_nowait(), 'ERR')
281     self.sl = locking.SharedLock()
282
283
284 class TestSSynchronizedDecorator(_ThreadedTestCase):
285   """Shared Lock Synchronized decorator test"""
286
287   def setUp(self):
288     _ThreadedTestCase.setUp(self)
289     # helper threads use the 'done' queue to tell the master they finished.
290     self.done = Queue.Queue(0)
291
292   @locking.ssynchronized(_decoratorlock)
293   def _doItExclusive(self):
294     self.assert_(_decoratorlock._is_owned())
295     self.done.put('EXC')
296
297   @locking.ssynchronized(_decoratorlock, shared=1)
298   def _doItSharer(self):
299     self.assert_(_decoratorlock._is_owned(shared=1))
300     self.done.put('SHR')
301
302   def testDecoratedFunctions(self):
303     self._doItExclusive()
304     self.assert_(not _decoratorlock._is_owned())
305     self._doItSharer()
306     self.assert_(not _decoratorlock._is_owned())
307
308   def testSharersCanCoexist(self):
309     _decoratorlock.acquire(shared=1)
310     Thread(target=self._doItSharer).start()
311     self.assert_(self.done.get(True, 1))
312     _decoratorlock.release()
313
314   @_Repeat
315   def testExclusiveBlocksExclusive(self):
316     _decoratorlock.acquire()
317     self._addThread(target=self._doItExclusive)
318     # give it a bit of time to check that it's not actually doing anything
319     self.assertRaises(Queue.Empty, self.done.get_nowait)
320     _decoratorlock.release()
321     self._waitThreads()
322     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
323
324   @_Repeat
325   def testExclusiveBlocksSharer(self):
326     _decoratorlock.acquire()
327     self._addThread(target=self._doItSharer)
328     self.assertRaises(Queue.Empty, self.done.get_nowait)
329     _decoratorlock.release()
330     self._waitThreads()
331     self.failUnlessEqual(self.done.get_nowait(), 'SHR')
332
333   @_Repeat
334   def testSharerBlocksExclusive(self):
335     _decoratorlock.acquire(shared=1)
336     self._addThread(target=self._doItExclusive)
337     self.assertRaises(Queue.Empty, self.done.get_nowait)
338     _decoratorlock.release()
339     self._waitThreads()
340     self.failUnlessEqual(self.done.get_nowait(), 'EXC')
341
342
343 class TestLockSet(_ThreadedTestCase):
344   """LockSet tests"""
345
346   def setUp(self):
347     _ThreadedTestCase.setUp(self)
348     self._setUpLS()
349     # helper threads use the 'done' queue to tell the master they finished.
350     self.done = Queue.Queue(0)
351
352   def _setUpLS(self):
353     """Helper to (re)initialize the lock set"""
354     self.resources = ['one', 'two', 'three']
355     self.ls = locking.LockSet(members=self.resources)
356
357
358   def testResources(self):
359     self.assertEquals(self.ls._names(), set(self.resources))
360     newls = locking.LockSet()
361     self.assertEquals(newls._names(), set())
362
363   def testAcquireRelease(self):
364     self.assert_(self.ls.acquire('one'))
365     self.assertEquals(self.ls._list_owned(), set(['one']))
366     self.ls.release()
367     self.assertEquals(self.ls._list_owned(), set())
368     self.assertEquals(self.ls.acquire(['one']), set(['one']))
369     self.assertEquals(self.ls._list_owned(), set(['one']))
370     self.ls.release()
371     self.assertEquals(self.ls._list_owned(), set())
372     self.ls.acquire(['one', 'two', 'three'])
373     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
374     self.ls.release('one')
375     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
376     self.ls.release(['three'])
377     self.assertEquals(self.ls._list_owned(), set(['two']))
378     self.ls.release()
379     self.assertEquals(self.ls._list_owned(), set())
380     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
381     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
382     self.ls.release()
383     self.assertEquals(self.ls._list_owned(), set())
384
385   def testNoDoubleAcquire(self):
386     self.ls.acquire('one')
387     self.assertRaises(AssertionError, self.ls.acquire, 'one')
388     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
389     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
390     self.ls.release()
391     self.ls.acquire(['one', 'three'])
392     self.ls.release('one')
393     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
394     self.ls.release('three')
395
396   def testNoWrongRelease(self):
397     self.assertRaises(AssertionError, self.ls.release)
398     self.ls.acquire('one')
399     self.assertRaises(AssertionError, self.ls.release, 'two')
400
401   def testAddRemove(self):
402     self.ls.add('four')
403     self.assertEquals(self.ls._list_owned(), set())
404     self.assert_('four' in self.ls._names())
405     self.ls.add(['five', 'six', 'seven'], acquired=1)
406     self.assert_('five' in self.ls._names())
407     self.assert_('six' in self.ls._names())
408     self.assert_('seven' in self.ls._names())
409     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
410     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
411     self.assert_('five' not in self.ls._names())
412     self.assert_('six' not in self.ls._names())
413     self.assertEquals(self.ls._list_owned(), set(['seven']))
414     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
415     self.ls.remove('seven')
416     self.assert_('seven' not in self.ls._names())
417     self.assertEquals(self.ls._list_owned(), set([]))
418     self.ls.acquire(None, shared=1)
419     self.assertRaises(AssertionError, self.ls.add, 'eight')
420     self.ls.release()
421     self.ls.acquire(None)
422     self.ls.add('eight', acquired=1)
423     self.assert_('eight' in self.ls._names())
424     self.assert_('eight' in self.ls._list_owned())
425     self.ls.add('nine')
426     self.assert_('nine' in self.ls._names())
427     self.assert_('nine' not in self.ls._list_owned())
428     self.ls.release()
429     self.ls.remove(['two'])
430     self.assert_('two' not in self.ls._names())
431     self.ls.acquire('three')
432     self.assertEquals(self.ls.remove(['three']), ['three'])
433     self.assert_('three' not in self.ls._names())
434     self.assertEquals(self.ls.remove('three'), [])
435     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
436     self.assert_('one' not in self.ls._names())
437
438   def testRemoveNonBlocking(self):
439     self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
440     self.ls.acquire('one')
441     self.assertEquals(self.ls.remove('one', blocking=0), ['one'])
442     self.ls.acquire(['two', 'three'])
443     self.assertEquals(self.ls.remove(['two', 'three'], blocking=0),
444                       ['two', 'three'])
445
446   def testNoDoubleAdd(self):
447     self.assertRaises(errors.LockError, self.ls.add, 'two')
448     self.ls.add('four')
449     self.assertRaises(errors.LockError, self.ls.add, 'four')
450
451   def testNoWrongRemoves(self):
452     self.ls.acquire(['one', 'three'], shared=1)
453     # Cannot remove 'two' while holding something which is not a superset
454     self.assertRaises(AssertionError, self.ls.remove, 'two')
455     # Cannot remove 'three' as we are sharing it
456     self.assertRaises(AssertionError, self.ls.remove, 'three')
457
458   def testAcquireSetLock(self):
459     # acquire the set-lock exclusively
460     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
461     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
462     self.assertEquals(self.ls._is_owned(), True)
463     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
464     # I can still add/remove elements...
465     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
466     self.assert_(self.ls.add('six'))
467     self.ls.release()
468     # share the set-lock
469     self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
470     # adding new elements is not possible
471     self.assertRaises(AssertionError, self.ls.add, 'five')
472     self.ls.release()
473
474   def testAcquireWithRepetitions(self):
475     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
476                       set(['two', 'two', 'three']))
477     self.ls.release(['two', 'two'])
478     self.assertEquals(self.ls._list_owned(), set(['three']))
479
480   def testEmptyAcquire(self):
481     # Acquire an empty list of locks...
482     self.assertEquals(self.ls.acquire([]), set())
483     self.assertEquals(self.ls._list_owned(), set())
484     # New locks can still be addded
485     self.assert_(self.ls.add('six'))
486     # "re-acquiring" is not an issue, since we had really acquired nothing
487     self.assertEquals(self.ls.acquire([], shared=1), set())
488     self.assertEquals(self.ls._list_owned(), set())
489     # We haven't really acquired anything, so we cannot release
490     self.assertRaises(AssertionError, self.ls.release)
491
492   def _doLockSet(self, set, shared):
493     try:
494       self.ls.acquire(set, shared=shared)
495       self.done.put('DONE')
496       self.ls.release()
497     except errors.LockError:
498       self.done.put('ERR')
499
500   def _doAddSet(self, set):
501     try:
502       self.ls.add(set, acquired=1)
503       self.done.put('DONE')
504       self.ls.release()
505     except errors.LockError:
506       self.done.put('ERR')
507
508   def _doRemoveSet(self, set):
509     self.done.put(self.ls.remove(set))
510
511   @_Repeat
512   def testConcurrentSharedAcquire(self):
513     self.ls.acquire(['one', 'two'], shared=1)
514     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
515     self._waitThreads()
516     self.assertEqual(self.done.get_nowait(), 'DONE')
517     self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
518     self._waitThreads()
519     self.assertEqual(self.done.get_nowait(), 'DONE')
520     self._addThread(target=self._doLockSet, args=('three', 1))
521     self._waitThreads()
522     self.assertEqual(self.done.get_nowait(), 'DONE')
523     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
524     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
525     self.assertRaises(Queue.Empty, self.done.get_nowait)
526     self.ls.release()
527     self._waitThreads()
528     self.assertEqual(self.done.get_nowait(), 'DONE')
529     self.assertEqual(self.done.get_nowait(), 'DONE')
530
531   @_Repeat
532   def testConcurrentExclusiveAcquire(self):
533     self.ls.acquire(['one', 'two'])
534     self._addThread(target=self._doLockSet, args=('three', 1))
535     self._waitThreads()
536     self.assertEqual(self.done.get_nowait(), 'DONE')
537     self._addThread(target=self._doLockSet, args=('three', 0))
538     self._waitThreads()
539     self.assertEqual(self.done.get_nowait(), 'DONE')
540     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
541     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
542     self._addThread(target=self._doLockSet, args=('one', 0))
543     self._addThread(target=self._doLockSet, args=('one', 1))
544     self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
545     self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
546     self.assertRaises(Queue.Empty, self.done.get_nowait)
547     self.ls.release()
548     self._waitThreads()
549     for _ in range(6):
550       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
551
552   @_Repeat
553   def testConcurrentRemove(self):
554     self.ls.add('four')
555     self.ls.acquire(['one', 'two', 'four'])
556     self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
557     self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
558     self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
559     self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
560     self.assertRaises(Queue.Empty, self.done.get_nowait)
561     self.ls.remove('one')
562     self.ls.release()
563     self._waitThreads()
564     for i in range(4):
565       self.failUnlessEqual(self.done.get_nowait(), 'ERR')
566     self.ls.add(['five', 'six'], acquired=1)
567     self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
568     self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
569     self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
570     self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
571     self.ls.remove('five')
572     self.ls.release()
573     self._waitThreads()
574     for i in range(4):
575       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
576     self.ls.acquire(['three', 'four'])
577     self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
578     self.assertRaises(Queue.Empty, self.done.get_nowait)
579     self.ls.remove('four')
580     self._waitThreads()
581     self.assertEqual(self.done.get_nowait(), ['six'])
582     self._addThread(target=self._doRemoveSet, args=(['two']))
583     self._waitThreads()
584     self.assertEqual(self.done.get_nowait(), ['two'])
585     self.ls.release()
586     # reset lockset
587     self._setUpLS()
588
589   @_Repeat
590   def testConcurrentSharedSetLock(self):
591     # share the set-lock...
592     self.ls.acquire(None, shared=1)
593     # ...another thread can share it too
594     self._addThread(target=self._doLockSet, args=(None, 1))
595     self._waitThreads()
596     self.assertEqual(self.done.get_nowait(), 'DONE')
597     # ...or just share some elements
598     self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
599     self._waitThreads()
600     self.assertEqual(self.done.get_nowait(), 'DONE')
601     # ...but not add new ones or remove any
602     t = self._addThread(target=self._doAddSet, args=(['nine']))
603     self._addThread(target=self._doRemoveSet, args=(['two'], ))
604     self.assertRaises(Queue.Empty, self.done.get_nowait)
605     # this just releases the set-lock
606     self.ls.release([])
607     t.join(60)
608     self.assertEqual(self.done.get_nowait(), 'DONE')
609     # release the lock on the actual elements so remove() can proceed too
610     self.ls.release()
611     self._waitThreads()
612     self.failUnlessEqual(self.done.get_nowait(), ['two'])
613     # reset lockset
614     self._setUpLS()
615
616   @_Repeat
617   def testConcurrentExclusiveSetLock(self):
618     # acquire the set-lock...
619     self.ls.acquire(None, shared=0)
620     # ...no one can do anything else
621     self._addThread(target=self._doLockSet, args=(None, 1))
622     self._addThread(target=self._doLockSet, args=(None, 0))
623     self._addThread(target=self._doLockSet, args=(['three'], 0))
624     self._addThread(target=self._doLockSet, args=(['two'], 1))
625     self._addThread(target=self._doAddSet, args=(['nine']))
626     self.assertRaises(Queue.Empty, self.done.get_nowait)
627     self.ls.release()
628     self._waitThreads()
629     for _ in range(5):
630       self.assertEqual(self.done.get(True, 1), 'DONE')
631     # cleanup
632     self._setUpLS()
633
634   @_Repeat
635   def testConcurrentSetLockAdd(self):
636     self.ls.acquire('one')
637     # Another thread wants the whole SetLock
638     self._addThread(target=self._doLockSet, args=(None, 0))
639     self._addThread(target=self._doLockSet, args=(None, 1))
640     self.assertRaises(Queue.Empty, self.done.get_nowait)
641     self.assertRaises(AssertionError, self.ls.add, 'four')
642     self.ls.release()
643     self._waitThreads()
644     self.assertEqual(self.done.get_nowait(), 'DONE')
645     self.assertEqual(self.done.get_nowait(), 'DONE')
646     self.ls.acquire(None)
647     self._addThread(target=self._doLockSet, args=(None, 0))
648     self._addThread(target=self._doLockSet, args=(None, 1))
649     self.assertRaises(Queue.Empty, self.done.get_nowait)
650     self.ls.add('four')
651     self.ls.add('five', acquired=1)
652     self.ls.add('six', acquired=1, shared=1)
653     self.assertEquals(self.ls._list_owned(),
654       set(['one', 'two', 'three', 'five', 'six']))
655     self.assertEquals(self.ls._is_owned(), True)
656     self.assertEquals(self.ls._names(),
657       set(['one', 'two', 'three', 'four', 'five', 'six']))
658     self.ls.release()
659     self._waitThreads()
660     self.assertEqual(self.done.get_nowait(), 'DONE')
661     self.assertEqual(self.done.get_nowait(), 'DONE')
662     self._setUpLS()
663
664   @_Repeat
665   def testEmptyLockSet(self):
666     # get the set-lock
667     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
668     # now empty it...
669     self.ls.remove(['one', 'two', 'three'])
670     # and adds/locks by another thread still wait
671     self._addThread(target=self._doAddSet, args=(['nine']))
672     self._addThread(target=self._doLockSet, args=(None, 1))
673     self._addThread(target=self._doLockSet, args=(None, 0))
674     self.assertRaises(Queue.Empty, self.done.get_nowait)
675     self.ls.release()
676     self._waitThreads()
677     for _ in range(3):
678       self.assertEqual(self.done.get_nowait(), 'DONE')
679     # empty it again...
680     self.assertEqual(self.ls.remove(['nine']), ['nine'])
681     # now share it...
682     self.assertEqual(self.ls.acquire(None, shared=1), set())
683     # other sharers can go, adds still wait
684     self._addThread(target=self._doLockSet, args=(None, 1))
685     self._waitThreads()
686     self.assertEqual(self.done.get_nowait(), 'DONE')
687     self._addThread(target=self._doAddSet, args=(['nine']))
688     self.assertRaises(Queue.Empty, self.done.get_nowait)
689     self.ls.release()
690     self._waitThreads()
691     self.assertEqual(self.done.get_nowait(), 'DONE')
692     self._setUpLS()
693
694
695 class TestGanetiLockManager(_ThreadedTestCase):
696
697   def setUp(self):
698     _ThreadedTestCase.setUp(self)
699     self.nodes=['n1', 'n2']
700     self.instances=['i1', 'i2', 'i3']
701     self.GL = locking.GanetiLockManager(nodes=self.nodes,
702                                         instances=self.instances)
703     self.done = Queue.Queue(0)
704
705   def tearDown(self):
706     # Don't try this at home...
707     locking.GanetiLockManager._instance = None
708
709   def testLockingConstants(self):
710     # The locking library internally cheats by assuming its constants have some
711     # relationships with each other. Check those hold true.
712     # This relationship is also used in the Processor to recursively acquire
713     # the right locks. Again, please don't break it.
714     for i in range(len(locking.LEVELS)):
715       self.assertEqual(i, locking.LEVELS[i])
716
717   def testDoubleGLFails(self):
718     self.assertRaises(AssertionError, locking.GanetiLockManager)
719
720   def testLockNames(self):
721     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
722     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
723     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
724                      set(self.instances))
725
726   def testInitAndResources(self):
727     locking.GanetiLockManager._instance = None
728     self.GL = locking.GanetiLockManager()
729     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
730     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
731     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
732
733     locking.GanetiLockManager._instance = None
734     self.GL = locking.GanetiLockManager(nodes=self.nodes)
735     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
736     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
737     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
738
739     locking.GanetiLockManager._instance = None
740     self.GL = locking.GanetiLockManager(instances=self.instances)
741     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
742     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
743     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
744                      set(self.instances))
745
746   def testAcquireRelease(self):
747     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
748     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
749     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
750     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
751     self.GL.release(locking.LEVEL_NODE, ['n2'])
752     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
753     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
754     self.GL.release(locking.LEVEL_NODE)
755     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
756     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
757     self.GL.release(locking.LEVEL_INSTANCE)
758     self.assertRaises(errors.LockError, self.GL.acquire,
759                       locking.LEVEL_INSTANCE, ['i5'])
760     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
761     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
762
763   def testAcquireWholeSets(self):
764     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
765     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
766                       set(self.instances))
767     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
768                       set(self.instances))
769     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
770                       set(self.nodes))
771     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
772                       set(self.nodes))
773     self.GL.release(locking.LEVEL_NODE)
774     self.GL.release(locking.LEVEL_INSTANCE)
775     self.GL.release(locking.LEVEL_CLUSTER)
776
777   def testAcquireWholeAndPartial(self):
778     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
779     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
780                       set(self.instances))
781     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
782                       set(self.instances))
783     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
784                       set(['n2']))
785     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
786                       set(['n2']))
787     self.GL.release(locking.LEVEL_NODE)
788     self.GL.release(locking.LEVEL_INSTANCE)
789     self.GL.release(locking.LEVEL_CLUSTER)
790
791   def testBGLDependency(self):
792     self.assertRaises(AssertionError, self.GL.acquire,
793                       locking.LEVEL_NODE, ['n1', 'n2'])
794     self.assertRaises(AssertionError, self.GL.acquire,
795                       locking.LEVEL_INSTANCE, ['i3'])
796     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
797     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
798     self.assertRaises(AssertionError, self.GL.release,
799                       locking.LEVEL_CLUSTER, ['BGL'])
800     self.assertRaises(AssertionError, self.GL.release,
801                       locking.LEVEL_CLUSTER)
802     self.GL.release(locking.LEVEL_NODE)
803     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
804     self.assertRaises(AssertionError, self.GL.release,
805                       locking.LEVEL_CLUSTER, ['BGL'])
806     self.assertRaises(AssertionError, self.GL.release,
807                       locking.LEVEL_CLUSTER)
808     self.GL.release(locking.LEVEL_INSTANCE)
809
810   def testWrongOrder(self):
811     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
812     self.GL.acquire(locking.LEVEL_NODE, ['n2'])
813     self.assertRaises(AssertionError, self.GL.acquire,
814                       locking.LEVEL_NODE, ['n1'])
815     self.assertRaises(AssertionError, self.GL.acquire,
816                       locking.LEVEL_INSTANCE, ['i2'])
817
818   # Helper function to run as a thread that shared the BGL and then acquires
819   # some locks at another level.
820   def _doLock(self, level, names, shared):
821     try:
822       self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
823       self.GL.acquire(level, names, shared=shared)
824       self.done.put('DONE')
825       self.GL.release(level)
826       self.GL.release(locking.LEVEL_CLUSTER)
827     except errors.LockError:
828       self.done.put('ERR')
829
830   @_Repeat
831   def testConcurrency(self):
832     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
833     self._addThread(target=self._doLock,
834                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
835     self._waitThreads()
836     self.assertEqual(self.done.get_nowait(), 'DONE')
837     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
838     self._addThread(target=self._doLock,
839                     args=(locking.LEVEL_INSTANCE, 'i1', 1))
840     self._waitThreads()
841     self.assertEqual(self.done.get_nowait(), 'DONE')
842     self._addThread(target=self._doLock,
843                     args=(locking.LEVEL_INSTANCE, 'i3', 1))
844     self.assertRaises(Queue.Empty, self.done.get_nowait)
845     self.GL.release(locking.LEVEL_INSTANCE)
846     self._waitThreads()
847     self.assertEqual(self.done.get_nowait(), 'DONE')
848     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
849     self._addThread(target=self._doLock,
850                     args=(locking.LEVEL_INSTANCE, 'i2', 1))
851     self._waitThreads()
852     self.assertEqual(self.done.get_nowait(), 'DONE')
853     self._addThread(target=self._doLock,
854                     args=(locking.LEVEL_INSTANCE, 'i2', 0))
855     self.assertRaises(Queue.Empty, self.done.get_nowait)
856     self.GL.release(locking.LEVEL_INSTANCE)
857     self._waitThreads()
858     self.assertEqual(self.done.get(True, 1), 'DONE')
859     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
860
861
862 if __name__ == '__main__':
863   unittest.main()
864   #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
865   #unittest.TextTestRunner(verbosity=2).run(suite)