LockSet implementation and unit tests
[ganeti-local] / test / ganeti.locking_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 0.0510-1301, USA.
20
21
22 """Script for unittesting the locking module"""
23
24
25 import os
26 import unittest
27 import time
28 import Queue
29
30 from ganeti import locking
31 from ganeti import errors
32 from threading import Thread
33
34
35 class TestSharedLock(unittest.TestCase):
36   """SharedLock tests"""
37
38   def setUp(self):
39     self.sl = locking.SharedLock()
40     # helper threads use the 'done' queue to tell the master they finished.
41     self.done = Queue.Queue(0)
42
43   def testSequenceAndOwnership(self):
44     self.assert_(not self.sl._is_owned())
45     self.sl.acquire(shared=1)
46     self.assert_(self.sl._is_owned())
47     self.assert_(self.sl._is_owned(shared=1))
48     self.assert_(not self.sl._is_owned(shared=0))
49     self.sl.release()
50     self.assert_(not self.sl._is_owned())
51     self.sl.acquire()
52     self.assert_(self.sl._is_owned())
53     self.assert_(not self.sl._is_owned(shared=1))
54     self.assert_(self.sl._is_owned(shared=0))
55     self.sl.release()
56     self.assert_(not self.sl._is_owned())
57     self.sl.acquire(shared=1)
58     self.assert_(self.sl._is_owned())
59     self.assert_(self.sl._is_owned(shared=1))
60     self.assert_(not self.sl._is_owned(shared=0))
61     self.sl.release()
62     self.assert_(not self.sl._is_owned())
63
64   def testBooleanValue(self):
65     # semaphores are supposed to return a true value on a successful acquire
66     self.assert_(self.sl.acquire(shared=1))
67     self.sl.release()
68     self.assert_(self.sl.acquire())
69     self.sl.release()
70
71   def testDoubleLockingStoE(self):
72     self.sl.acquire(shared=1)
73     self.assertRaises(AssertionError, self.sl.acquire)
74
75   def testDoubleLockingEtoS(self):
76     self.sl.acquire()
77     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
78
79   def testDoubleLockingStoS(self):
80     self.sl.acquire(shared=1)
81     self.assertRaises(AssertionError, self.sl.acquire, shared=1)
82
83   def testDoubleLockingEtoE(self):
84     self.sl.acquire()
85     self.assertRaises(AssertionError, self.sl.acquire)
86
87   # helper functions: called in a separate thread they acquire the lock, send
88   # their identifier on the done queue, then release it.
89   def _doItSharer(self):
90     try:
91       self.sl.acquire(shared=1)
92       self.done.put('SHR')
93       self.sl.release()
94     except errors.LockError:
95       self.done.put('ERR')
96
97   def _doItExclusive(self):
98     try:
99       self.sl.acquire()
100       self.done.put('EXC')
101       self.sl.release()
102     except errors.LockError:
103       self.done.put('ERR')
104
105   def _doItDelete(self):
106     try:
107       self.sl.delete()
108       self.done.put('DEL')
109     except errors.LockError:
110       self.done.put('ERR')
111
112   def testSharersCanCoexist(self):
113     self.sl.acquire(shared=1)
114     Thread(target=self._doItSharer).start()
115     self.assert_(self.done.get(True, 1))
116     self.sl.release()
117
118   def testExclusiveBlocksExclusive(self):
119     self.sl.acquire()
120     Thread(target=self._doItExclusive).start()
121     # give it a bit of time to check that it's not actually doing anything
122     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
123     self.sl.release()
124     self.assert_(self.done.get(True, 1))
125
126   def testExclusiveBlocksDelete(self):
127     self.sl.acquire()
128     Thread(target=self._doItDelete).start()
129     # give it a bit of time to check that it's not actually doing anything
130     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
131     self.sl.release()
132     self.assert_(self.done.get(True, 1))
133
134   def testExclusiveBlocksSharer(self):
135     self.sl.acquire()
136     Thread(target=self._doItSharer).start()
137     time.sleep(0.05)
138     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
139     self.sl.release()
140     self.assert_(self.done.get(True, 1))
141
142   def testSharerBlocksExclusive(self):
143     self.sl.acquire(shared=1)
144     Thread(target=self._doItExclusive).start()
145     time.sleep(0.05)
146     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
147     self.sl.release()
148     self.assert_(self.done.get(True, 1))
149
150   def testSharerBlocksDelete(self):
151     self.sl.acquire(shared=1)
152     Thread(target=self._doItDelete).start()
153     time.sleep(0.05)
154     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
155     self.sl.release()
156     self.assert_(self.done.get(True, 1))
157
158   def testWaitingExclusiveBlocksSharer(self):
159     self.sl.acquire(shared=1)
160     # the lock is acquired in shared mode...
161     Thread(target=self._doItExclusive).start()
162     # ...but now an exclusive is waiting...
163     time.sleep(0.05)
164     Thread(target=self._doItSharer).start()
165     # ...so the sharer should be blocked as well
166     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
167     self.sl.release()
168     # The exclusive passed before
169     self.assertEqual(self.done.get(True, 1), 'EXC')
170     self.assertEqual(self.done.get(True, 1), 'SHR')
171
172   def testWaitingSharerBlocksExclusive(self):
173     self.sl.acquire()
174     # the lock is acquired in exclusive mode...
175     Thread(target=self._doItSharer).start()
176     # ...but now a sharer is waiting...
177     time.sleep(0.05)
178     Thread(target=self._doItExclusive).start()
179     # ...the exclusive is waiting too...
180     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
181     self.sl.release()
182     # The sharer passed before
183     self.assertEqual(self.done.get(True, 1), 'SHR')
184     self.assertEqual(self.done.get(True, 1), 'EXC')
185
186   def testNoNonBlocking(self):
187     self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
188     self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
189     self.sl.acquire()
190     self.sl.delete(blocking=0) # Fine, because the lock is already acquired
191
192   def testDelete(self):
193     self.sl.delete()
194     self.assertRaises(errors.LockError, self.sl.acquire)
195     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
196     self.assertRaises(errors.LockError, self.sl.delete)
197
198   def testNoDeleteIfSharer(self):
199     self.sl.acquire(shared=1)
200     self.assertRaises(AssertionError, self.sl.delete)
201
202   def testDeletePendingSharersExclusiveDelete(self):
203     self.sl.acquire()
204     Thread(target=self._doItSharer).start()
205     Thread(target=self._doItSharer).start()
206     time.sleep(0.05)
207     Thread(target=self._doItExclusive).start()
208     Thread(target=self._doItDelete).start()
209     time.sleep(0.05)
210     self.sl.delete()
211     # The two threads who were pending return both ERR
212     self.assertEqual(self.done.get(True, 1), 'ERR')
213     self.assertEqual(self.done.get(True, 1), 'ERR')
214     self.assertEqual(self.done.get(True, 1), 'ERR')
215     self.assertEqual(self.done.get(True, 1), 'ERR')
216
217   def testDeletePendingDeleteExclusiveSharers(self):
218     self.sl.acquire()
219     Thread(target=self._doItDelete).start()
220     Thread(target=self._doItExclusive).start()
221     time.sleep(0.05)
222     Thread(target=self._doItSharer).start()
223     Thread(target=self._doItSharer).start()
224     time.sleep(0.05)
225     self.sl.delete()
226     # The two threads who were pending return both ERR
227     self.assertEqual(self.done.get(True, 1), 'ERR')
228     self.assertEqual(self.done.get(True, 1), 'ERR')
229     self.assertEqual(self.done.get(True, 1), 'ERR')
230     self.assertEqual(self.done.get(True, 1), 'ERR')
231
232
233 class TestLockSet(unittest.TestCase):
234   """LockSet tests"""
235
236   def setUp(self):
237     self.resources = ['one', 'two', 'three']
238     self.ls = locking.LockSet(self.resources)
239     # helper threads use the 'done' queue to tell the master they finished.
240     self.done = Queue.Queue(0)
241
242   def testResources(self):
243     self.assertEquals(self.ls._names(), set(self.resources))
244     newls = locking.LockSet()
245     self.assertEquals(newls._names(), set())
246
247   def testAcquireRelease(self):
248     self.ls.acquire('one')
249     self.assertEquals(self.ls._list_owned(), set(['one']))
250     self.ls.release()
251     self.assertEquals(self.ls._list_owned(), set())
252     self.ls.acquire(['one'])
253     self.assertEquals(self.ls._list_owned(), set(['one']))
254     self.ls.release()
255     self.assertEquals(self.ls._list_owned(), set())
256     self.ls.acquire(['one', 'two', 'three'])
257     self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
258     self.ls.release('one')
259     self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
260     self.ls.release(['three'])
261     self.assertEquals(self.ls._list_owned(), set(['two']))
262     self.ls.release()
263     self.assertEquals(self.ls._list_owned(), set())
264     self.ls.acquire(['one', 'three'])
265     self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
266     self.ls.release()
267     self.assertEquals(self.ls._list_owned(), set())
268
269   def testNoDoubleAcquire(self):
270     self.ls.acquire('one')
271     self.assertRaises(AssertionError, self.ls.acquire, 'one')
272     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
273     self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
274     self.ls.release()
275     self.ls.acquire(['one', 'three'])
276     self.ls.release('one')
277     self.assertRaises(AssertionError, self.ls.acquire, ['two'])
278     self.ls.release('three')
279
280   def testNoWrongRelease(self):
281     self.assertRaises(AssertionError, self.ls.release)
282     self.ls.acquire('one')
283     self.assertRaises(AssertionError, self.ls.release, 'two')
284
285   def testAddRemove(self):
286     self.ls.add('four')
287     self.assertEquals(self.ls._list_owned(), set())
288     self.assert_('four' in self.ls._names())
289     self.ls.add(['five', 'six', 'seven'], acquired=1)
290     self.assert_('five' in self.ls._names())
291     self.assert_('six' in self.ls._names())
292     self.assert_('seven' in self.ls._names())
293     self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
294     self.ls.remove(['five', 'six'])
295     self.assert_('five' not in self.ls._names())
296     self.assert_('six' not in self.ls._names())
297     self.assertEquals(self.ls._list_owned(), set(['seven']))
298     self.ls.add('eight', acquired=1, shared=1)
299     self.assert_('eight' in self.ls._names())
300     self.assertEquals(self.ls._list_owned(), set(['seven', 'eight']))
301     self.ls.remove('seven')
302     self.assert_('seven' not in self.ls._names())
303     self.assertEquals(self.ls._list_owned(), set(['eight']))
304     self.ls.release()
305     self.ls.remove(['two'])
306     self.assert_('two' not in self.ls._names())
307     self.ls.acquire('three')
308     self.ls.remove(['three'])
309     self.assert_('three' not in self.ls._names())
310     self.assertEquals(self.ls.remove('three'), ['three'])
311     self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['three', 'six'])
312     self.assert_('one' not in self.ls._names())
313
314   def testRemoveNonBlocking(self):
315     self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
316     self.ls.acquire('one')
317     self.assertEquals(self.ls.remove('one', blocking=0), [])
318     self.ls.acquire(['two', 'three'])
319     self.assertEquals(self.ls.remove(['two', 'three'], blocking=0), [])
320
321   def testNoDoubleAdd(self):
322     self.assertRaises(errors.LockError, self.ls.add, 'two')
323     self.ls.add('four')
324     self.assertRaises(errors.LockError, self.ls.add, 'four')
325
326   def testNoWrongRemoves(self):
327     self.ls.acquire(['one', 'three'], shared=1)
328     # Cannot remove 'two' while holding something which is not a superset
329     self.assertRaises(AssertionError, self.ls.remove, 'two')
330     # Cannot remove 'three' as we are sharing it
331     self.assertRaises(AssertionError, self.ls.remove, 'three')
332
333   def _doLockSet(self, set, shared):
334     try:
335       self.ls.acquire(set, shared=shared)
336       self.done.put('DONE')
337       self.ls.release()
338     except errors.LockError:
339       self.done.put('ERR')
340
341   def _doRemoveSet(self, set):
342     self.done.put(self.ls.remove(set))
343
344   def testConcurrentSharedAcquire(self):
345     self.ls.acquire(['one', 'two'], shared=1)
346     Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
347     self.assertEqual(self.done.get(True, 1), 'DONE')
348     Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start()
349     self.assertEqual(self.done.get(True, 1), 'DONE')
350     Thread(target=self._doLockSet, args=('three', 1)).start()
351     self.assertEqual(self.done.get(True, 1), 'DONE')
352     Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
353     Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
354     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
355     self.ls.release()
356     self.assertEqual(self.done.get(True, 1), 'DONE')
357     self.assertEqual(self.done.get(True, 1), 'DONE')
358
359   def testConcurrentExclusiveAcquire(self):
360     self.ls.acquire(['one', 'two'])
361     Thread(target=self._doLockSet, args=('three', 1)).start()
362     self.assertEqual(self.done.get(True, 1), 'DONE')
363     Thread(target=self._doLockSet, args=('three', 0)).start()
364     self.assertEqual(self.done.get(True, 1), 'DONE')
365     Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
366     Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
367     Thread(target=self._doLockSet, args=('one', 0)).start()
368     Thread(target=self._doLockSet, args=('one', 1)).start()
369     Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
370     Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start()
371     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
372     self.ls.release()
373     self.assertEqual(self.done.get(True, 1), 'DONE')
374     self.assertEqual(self.done.get(True, 1), 'DONE')
375     self.assertEqual(self.done.get(True, 1), 'DONE')
376     self.assertEqual(self.done.get(True, 1), 'DONE')
377     self.assertEqual(self.done.get(True, 1), 'DONE')
378     self.assertEqual(self.done.get(True, 1), 'DONE')
379
380   def testConcurrentRemove(self):
381     self.ls.add('four')
382     self.ls.acquire(['one', 'two', 'four'])
383     Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start()
384     Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start()
385     Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
386     Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
387     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
388     self.ls.remove('one')
389     self.ls.release()
390     self.assertEqual(self.done.get(True, 1), 'ERR')
391     self.assertEqual(self.done.get(True, 1), 'ERR')
392     self.assertEqual(self.done.get(True, 1), 'ERR')
393     self.assertEqual(self.done.get(True, 1), 'ERR')
394     self.ls.add(['five', 'six'], acquired=1)
395     Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start()
396     Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start()
397     Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start()
398     Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start()
399     self.ls.remove('five')
400     self.ls.release()
401     self.assertEqual(self.done.get(True, 1), 'DONE')
402     self.assertEqual(self.done.get(True, 1), 'DONE')
403     self.assertEqual(self.done.get(True, 1), 'DONE')
404     self.assertEqual(self.done.get(True, 1), 'DONE')
405     self.ls.acquire(['three', 'four'])
406     Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start()
407     self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
408     self.ls.remove('four')
409     self.assertEqual(self.done.get(True, 1), ['four'])
410     Thread(target=self._doRemoveSet, args=(['two'])).start()
411     self.assertEqual(self.done.get(True, 1), [])
412     self.ls.release()
413
414
415 if __name__ == '__main__':
416   unittest.main()
417   #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
418   #unittest.TextTestRunner(verbosity=2).run(suite)