Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 5a9c3f46

History | View | Annotate | Download (32 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29

    
30
from ganeti import locking
31
from ganeti import errors
32
from threading import Thread
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42
def _Repeat(fn):
43
  """Decorator for executing a function many times"""
44
  def wrapper(*args, **kwargs):
45
    for i in ITERATIONS:
46
      fn(*args, **kwargs)
47
  return wrapper
48

    
49
class _ThreadedTestCase(unittest.TestCase):
50
  """Test class that supports adding/waiting on threads"""
51
  def setUp(self):
52
    unittest.TestCase.setUp(self)
53
    self.threads = []
54

    
55
  def _addThread(self, *args, **kwargs):
56
    """Create and remember a new thread"""
57
    t = Thread(*args, **kwargs)
58
    self.threads.append(t)
59
    t.start()
60
    return t
61

    
62
  def _waitThreads(self):
63
    """Wait for all our threads to finish"""
64
    for t in self.threads:
65
      t.join(60)
66
      self.failIf(t.isAlive())
67
    self.threads = []
68

    
69

    
70
class TestSharedLock(_ThreadedTestCase):
71
  """SharedLock tests"""
72

    
73
  def setUp(self):
74
    _ThreadedTestCase.setUp(self)
75
    self.sl = locking.SharedLock()
76
    # helper threads use the 'done' queue to tell the master they finished.
77
    self.done = Queue.Queue(0)
78

    
79
  def testSequenceAndOwnership(self):
80
    self.assert_(not self.sl._is_owned())
81
    self.sl.acquire(shared=1)
82
    self.assert_(self.sl._is_owned())
83
    self.assert_(self.sl._is_owned(shared=1))
84
    self.assert_(not self.sl._is_owned(shared=0))
85
    self.sl.release()
86
    self.assert_(not self.sl._is_owned())
87
    self.sl.acquire()
88
    self.assert_(self.sl._is_owned())
89
    self.assert_(not self.sl._is_owned(shared=1))
90
    self.assert_(self.sl._is_owned(shared=0))
91
    self.sl.release()
92
    self.assert_(not self.sl._is_owned())
93
    self.sl.acquire(shared=1)
94
    self.assert_(self.sl._is_owned())
95
    self.assert_(self.sl._is_owned(shared=1))
96
    self.assert_(not self.sl._is_owned(shared=0))
97
    self.sl.release()
98
    self.assert_(not self.sl._is_owned())
99

    
100
  def testBooleanValue(self):
101
    # semaphores are supposed to return a true value on a successful acquire
102
    self.assert_(self.sl.acquire(shared=1))
103
    self.sl.release()
104
    self.assert_(self.sl.acquire())
105
    self.sl.release()
106

    
107
  def testDoubleLockingStoE(self):
108
    self.sl.acquire(shared=1)
109
    self.assertRaises(AssertionError, self.sl.acquire)
110

    
111
  def testDoubleLockingEtoS(self):
112
    self.sl.acquire()
113
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
114

    
115
  def testDoubleLockingStoS(self):
116
    self.sl.acquire(shared=1)
117
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
118

    
119
  def testDoubleLockingEtoE(self):
120
    self.sl.acquire()
121
    self.assertRaises(AssertionError, self.sl.acquire)
122

    
123
  # helper functions: called in a separate thread they acquire the lock, send
124
  # their identifier on the done queue, then release it.
125
  def _doItSharer(self):
126
    try:
127
      self.sl.acquire(shared=1)
128
      self.done.put('SHR')
129
      self.sl.release()
130
    except errors.LockError:
131
      self.done.put('ERR')
132

    
133
  def _doItExclusive(self):
134
    try:
135
      self.sl.acquire()
136
      self.done.put('EXC')
137
      self.sl.release()
138
    except errors.LockError:
139
      self.done.put('ERR')
140

    
141
  def _doItDelete(self):
142
    try:
143
      self.sl.delete()
144
      self.done.put('DEL')
145
    except errors.LockError:
146
      self.done.put('ERR')
147

    
148
  def testSharersCanCoexist(self):
149
    self.sl.acquire(shared=1)
150
    Thread(target=self._doItSharer).start()
151
    self.assert_(self.done.get(True, 1))
152
    self.sl.release()
153

    
154
  @_Repeat
155
  def testExclusiveBlocksExclusive(self):
156
    self.sl.acquire()
157
    self._addThread(target=self._doItExclusive)
158
    self.assertRaises(Queue.Empty, self.done.get_nowait)
159
    self.sl.release()
160
    self._waitThreads()
161
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
162

    
163
  @_Repeat
164
  def testExclusiveBlocksDelete(self):
165
    self.sl.acquire()
166
    self._addThread(target=self._doItDelete)
167
    self.assertRaises(Queue.Empty, self.done.get_nowait)
168
    self.sl.release()
169
    self._waitThreads()
170
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
171
    self.sl = locking.SharedLock()
172

    
173
  @_Repeat
174
  def testExclusiveBlocksSharer(self):
175
    self.sl.acquire()
176
    self._addThread(target=self._doItSharer)
177
    self.assertRaises(Queue.Empty, self.done.get_nowait)
178
    self.sl.release()
179
    self._waitThreads()
180
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
181

    
182
  @_Repeat
183
  def testSharerBlocksExclusive(self):
184
    self.sl.acquire(shared=1)
185
    self._addThread(target=self._doItExclusive)
186
    self.assertRaises(Queue.Empty, self.done.get_nowait)
187
    self.sl.release()
188
    self._waitThreads()
189
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
190

    
191
  @_Repeat
192
  def testSharerBlocksDelete(self):
193
    self.sl.acquire(shared=1)
194
    self._addThread(target=self._doItDelete)
195
    self.assertRaises(Queue.Empty, self.done.get_nowait)
196
    self.sl.release()
197
    self._waitThreads()
198
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
199
    self.sl = locking.SharedLock()
200

    
201
  @_Repeat
202
  def testWaitingExclusiveBlocksSharer(self):
203
    """SKIPPED testWaitingExclusiveBlockSharer"""
204
    return
205

    
206
    self.sl.acquire(shared=1)
207
    # the lock is acquired in shared mode...
208
    self._addThread(target=self._doItExclusive)
209
    # ...but now an exclusive is waiting...
210
    self._addThread(target=self._doItSharer)
211
    # ...so the sharer should be blocked as well
212
    self.assertRaises(Queue.Empty, self.done.get_nowait)
213
    self.sl.release()
214
    self._waitThreads()
215
    # The exclusive passed before
216
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
217
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
218

    
219
  @_Repeat
220
  def testWaitingSharerBlocksExclusive(self):
221
    """SKIPPED testWaitingSharerBlocksExclusive"""
222
    return
223

    
224
    self.sl.acquire()
225
    # the lock is acquired in exclusive mode...
226
    self._addThread(target=self._doItSharer)
227
    # ...but now a sharer is waiting...
228
    self._addThread(target=self._doItExclusive)
229
    # ...the exclusive is waiting too...
230
    self.assertRaises(Queue.Empty, self.done.get_nowait)
231
    self.sl.release()
232
    self._waitThreads()
233
    # The sharer passed before
234
    self.assertEqual(self.done.get_nowait(), 'SHR')
235
    self.assertEqual(self.done.get_nowait(), 'EXC')
236

    
237
  def testNoNonBlocking(self):
238
    self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
239
    self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
240
    self.sl.acquire()
241
    self.sl.delete(blocking=0) # Fine, because the lock is already acquired
242

    
243
  def testDelete(self):
244
    self.sl.delete()
245
    self.assertRaises(errors.LockError, self.sl.acquire)
246
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
247
    self.assertRaises(errors.LockError, self.sl.delete)
248

    
249
  def testNoDeleteIfSharer(self):
250
    self.sl.acquire(shared=1)
251
    self.assertRaises(AssertionError, self.sl.delete)
252

    
253
  @_Repeat
254
  def testDeletePendingSharersExclusiveDelete(self):
255
    self.sl.acquire()
256
    self._addThread(target=self._doItSharer)
257
    self._addThread(target=self._doItSharer)
258
    self._addThread(target=self._doItExclusive)
259
    self._addThread(target=self._doItDelete)
260
    self.sl.delete()
261
    self._waitThreads()
262
    # The threads who were pending return ERR
263
    for _ in range(4):
264
      self.assertEqual(self.done.get_nowait(), 'ERR')
265
    self.sl = locking.SharedLock()
266

    
267
  @_Repeat
268
  def testDeletePendingDeleteExclusiveSharers(self):
269
    self.sl.acquire()
270
    self._addThread(target=self._doItDelete)
271
    self._addThread(target=self._doItExclusive)
272
    self._addThread(target=self._doItSharer)
273
    self._addThread(target=self._doItSharer)
274
    self.sl.delete()
275
    self._waitThreads()
276
    # The two threads who were pending return both ERR
277
    self.assertEqual(self.done.get_nowait(), 'ERR')
278
    self.assertEqual(self.done.get_nowait(), 'ERR')
279
    self.assertEqual(self.done.get_nowait(), 'ERR')
280
    self.assertEqual(self.done.get_nowait(), 'ERR')
281
    self.sl = locking.SharedLock()
282

    
283

    
284
class TestSSynchronizedDecorator(_ThreadedTestCase):
285
  """Shared Lock Synchronized decorator test"""
286

    
287
  def setUp(self):
288
    _ThreadedTestCase.setUp(self)
289
    # helper threads use the 'done' queue to tell the master they finished.
290
    self.done = Queue.Queue(0)
291

    
292
  @locking.ssynchronized(_decoratorlock)
293
  def _doItExclusive(self):
294
    self.assert_(_decoratorlock._is_owned())
295
    self.done.put('EXC')
296

    
297
  @locking.ssynchronized(_decoratorlock, shared=1)
298
  def _doItSharer(self):
299
    self.assert_(_decoratorlock._is_owned(shared=1))
300
    self.done.put('SHR')
301

    
302
  def testDecoratedFunctions(self):
303
    self._doItExclusive()
304
    self.assert_(not _decoratorlock._is_owned())
305
    self._doItSharer()
306
    self.assert_(not _decoratorlock._is_owned())
307

    
308
  def testSharersCanCoexist(self):
309
    _decoratorlock.acquire(shared=1)
310
    Thread(target=self._doItSharer).start()
311
    self.assert_(self.done.get(True, 1))
312
    _decoratorlock.release()
313

    
314
  @_Repeat
315
  def testExclusiveBlocksExclusive(self):
316
    _decoratorlock.acquire()
317
    self._addThread(target=self._doItExclusive)
318
    # give it a bit of time to check that it's not actually doing anything
319
    self.assertRaises(Queue.Empty, self.done.get_nowait)
320
    _decoratorlock.release()
321
    self._waitThreads()
322
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
323

    
324
  @_Repeat
325
  def testExclusiveBlocksSharer(self):
326
    _decoratorlock.acquire()
327
    self._addThread(target=self._doItSharer)
328
    self.assertRaises(Queue.Empty, self.done.get_nowait)
329
    _decoratorlock.release()
330
    self._waitThreads()
331
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
332

    
333
  @_Repeat
334
  def testSharerBlocksExclusive(self):
335
    _decoratorlock.acquire(shared=1)
336
    self._addThread(target=self._doItExclusive)
337
    self.assertRaises(Queue.Empty, self.done.get_nowait)
338
    _decoratorlock.release()
339
    self._waitThreads()
340
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
341

    
342

    
343
class TestLockSet(_ThreadedTestCase):
344
  """LockSet tests"""
345

    
346
  def setUp(self):
347
    _ThreadedTestCase.setUp(self)
348
    self._setUpLS()
349
    # helper threads use the 'done' queue to tell the master they finished.
350
    self.done = Queue.Queue(0)
351

    
352
  def _setUpLS(self):
353
    """Helper to (re)initialize the lock set"""
354
    self.resources = ['one', 'two', 'three']
355
    self.ls = locking.LockSet(members=self.resources)
356

    
357

    
358
  def testResources(self):
359
    self.assertEquals(self.ls._names(), set(self.resources))
360
    newls = locking.LockSet()
361
    self.assertEquals(newls._names(), set())
362

    
363
  def testAcquireRelease(self):
364
    self.assert_(self.ls.acquire('one'))
365
    self.assertEquals(self.ls._list_owned(), set(['one']))
366
    self.ls.release()
367
    self.assertEquals(self.ls._list_owned(), set())
368
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
369
    self.assertEquals(self.ls._list_owned(), set(['one']))
370
    self.ls.release()
371
    self.assertEquals(self.ls._list_owned(), set())
372
    self.ls.acquire(['one', 'two', 'three'])
373
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
374
    self.ls.release('one')
375
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
376
    self.ls.release(['three'])
377
    self.assertEquals(self.ls._list_owned(), set(['two']))
378
    self.ls.release()
379
    self.assertEquals(self.ls._list_owned(), set())
380
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
381
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
382
    self.ls.release()
383
    self.assertEquals(self.ls._list_owned(), set())
384

    
385
  def testNoDoubleAcquire(self):
386
    self.ls.acquire('one')
387
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
388
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
389
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
390
    self.ls.release()
391
    self.ls.acquire(['one', 'three'])
392
    self.ls.release('one')
393
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
394
    self.ls.release('three')
395

    
396
  def testNoWrongRelease(self):
397
    self.assertRaises(AssertionError, self.ls.release)
398
    self.ls.acquire('one')
399
    self.assertRaises(AssertionError, self.ls.release, 'two')
400

    
401
  def testAddRemove(self):
402
    self.ls.add('four')
403
    self.assertEquals(self.ls._list_owned(), set())
404
    self.assert_('four' in self.ls._names())
405
    self.ls.add(['five', 'six', 'seven'], acquired=1)
406
    self.assert_('five' in self.ls._names())
407
    self.assert_('six' in self.ls._names())
408
    self.assert_('seven' in self.ls._names())
409
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
410
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
411
    self.assert_('five' not in self.ls._names())
412
    self.assert_('six' not in self.ls._names())
413
    self.assertEquals(self.ls._list_owned(), set(['seven']))
414
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
415
    self.ls.remove('seven')
416
    self.assert_('seven' not in self.ls._names())
417
    self.assertEquals(self.ls._list_owned(), set([]))
418
    self.ls.acquire(None, shared=1)
419
    self.assertRaises(AssertionError, self.ls.add, 'eight')
420
    self.ls.release()
421
    self.ls.acquire(None)
422
    self.ls.add('eight', acquired=1)
423
    self.assert_('eight' in self.ls._names())
424
    self.assert_('eight' in self.ls._list_owned())
425
    self.ls.add('nine')
426
    self.assert_('nine' in self.ls._names())
427
    self.assert_('nine' not in self.ls._list_owned())
428
    self.ls.release()
429
    self.ls.remove(['two'])
430
    self.assert_('two' not in self.ls._names())
431
    self.ls.acquire('three')
432
    self.assertEquals(self.ls.remove(['three']), ['three'])
433
    self.assert_('three' not in self.ls._names())
434
    self.assertEquals(self.ls.remove('three'), [])
435
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
436
    self.assert_('one' not in self.ls._names())
437

    
438
  def testRemoveNonBlocking(self):
439
    self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
440
    self.ls.acquire('one')
441
    self.assertEquals(self.ls.remove('one', blocking=0), ['one'])
442
    self.ls.acquire(['two', 'three'])
443
    self.assertEquals(self.ls.remove(['two', 'three'], blocking=0),
444
                      ['two', 'three'])
445

    
446
  def testNoDoubleAdd(self):
447
    self.assertRaises(errors.LockError, self.ls.add, 'two')
448
    self.ls.add('four')
449
    self.assertRaises(errors.LockError, self.ls.add, 'four')
450

    
451
  def testNoWrongRemoves(self):
452
    self.ls.acquire(['one', 'three'], shared=1)
453
    # Cannot remove 'two' while holding something which is not a superset
454
    self.assertRaises(AssertionError, self.ls.remove, 'two')
455
    # Cannot remove 'three' as we are sharing it
456
    self.assertRaises(AssertionError, self.ls.remove, 'three')
457

    
458
  def testAcquireSetLock(self):
459
    # acquire the set-lock exclusively
460
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
461
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
462
    self.assertEquals(self.ls._is_owned(), True)
463
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
464
    # I can still add/remove elements...
465
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
466
    self.assert_(self.ls.add('six'))
467
    self.ls.release()
468
    # share the set-lock
469
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
470
    # adding new elements is not possible
471
    self.assertRaises(AssertionError, self.ls.add, 'five')
472
    self.ls.release()
473

    
474
  def testAcquireWithRepetitions(self):
475
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
476
                      set(['two', 'two', 'three']))
477
    self.ls.release(['two', 'two'])
478
    self.assertEquals(self.ls._list_owned(), set(['three']))
479

    
480
  def testEmptyAcquire(self):
481
    # Acquire an empty list of locks...
482
    self.assertEquals(self.ls.acquire([]), set())
483
    self.assertEquals(self.ls._list_owned(), set())
484
    # New locks can still be addded
485
    self.assert_(self.ls.add('six'))
486
    # "re-acquiring" is not an issue, since we had really acquired nothing
487
    self.assertEquals(self.ls.acquire([], shared=1), set())
488
    self.assertEquals(self.ls._list_owned(), set())
489
    # We haven't really acquired anything, so we cannot release
490
    self.assertRaises(AssertionError, self.ls.release)
491

    
492
  def _doLockSet(self, set, shared):
493
    try:
494
      self.ls.acquire(set, shared=shared)
495
      self.done.put('DONE')
496
      self.ls.release()
497
    except errors.LockError:
498
      self.done.put('ERR')
499

    
500
  def _doAddSet(self, set):
501
    try:
502
      self.ls.add(set, acquired=1)
503
      self.done.put('DONE')
504
      self.ls.release()
505
    except errors.LockError:
506
      self.done.put('ERR')
507

    
508
  def _doRemoveSet(self, set):
509
    self.done.put(self.ls.remove(set))
510

    
511
  @_Repeat
512
  def testConcurrentSharedAcquire(self):
513
    self.ls.acquire(['one', 'two'], shared=1)
514
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
515
    self._waitThreads()
516
    self.assertEqual(self.done.get_nowait(), 'DONE')
517
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
518
    self._waitThreads()
519
    self.assertEqual(self.done.get_nowait(), 'DONE')
520
    self._addThread(target=self._doLockSet, args=('three', 1))
521
    self._waitThreads()
522
    self.assertEqual(self.done.get_nowait(), 'DONE')
523
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
524
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
525
    self.assertRaises(Queue.Empty, self.done.get_nowait)
526
    self.ls.release()
527
    self._waitThreads()
528
    self.assertEqual(self.done.get_nowait(), 'DONE')
529
    self.assertEqual(self.done.get_nowait(), 'DONE')
530

    
531
  @_Repeat
532
  def testConcurrentExclusiveAcquire(self):
533
    self.ls.acquire(['one', 'two'])
534
    self._addThread(target=self._doLockSet, args=('three', 1))
535
    self._waitThreads()
536
    self.assertEqual(self.done.get_nowait(), 'DONE')
537
    self._addThread(target=self._doLockSet, args=('three', 0))
538
    self._waitThreads()
539
    self.assertEqual(self.done.get_nowait(), 'DONE')
540
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
541
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
542
    self._addThread(target=self._doLockSet, args=('one', 0))
543
    self._addThread(target=self._doLockSet, args=('one', 1))
544
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
545
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
546
    self.assertRaises(Queue.Empty, self.done.get_nowait)
547
    self.ls.release()
548
    self._waitThreads()
549
    for _ in range(6):
550
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
551

    
552
  @_Repeat
553
  def testConcurrentRemove(self):
554
    self.ls.add('four')
555
    self.ls.acquire(['one', 'two', 'four'])
556
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
557
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
558
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
559
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
560
    self.assertRaises(Queue.Empty, self.done.get_nowait)
561
    self.ls.remove('one')
562
    self.ls.release()
563
    self._waitThreads()
564
    for i in range(4):
565
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
566
    self.ls.add(['five', 'six'], acquired=1)
567
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
568
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
569
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
570
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
571
    self.ls.remove('five')
572
    self.ls.release()
573
    self._waitThreads()
574
    for i in range(4):
575
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
576
    self.ls.acquire(['three', 'four'])
577
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
578
    self.assertRaises(Queue.Empty, self.done.get_nowait)
579
    self.ls.remove('four')
580
    self._waitThreads()
581
    self.assertEqual(self.done.get_nowait(), ['six'])
582
    self._addThread(target=self._doRemoveSet, args=(['two']))
583
    self._waitThreads()
584
    self.assertEqual(self.done.get_nowait(), ['two'])
585
    self.ls.release()
586
    # reset lockset
587
    self._setUpLS()
588

    
589
  @_Repeat
590
  def testConcurrentSharedSetLock(self):
591
    # share the set-lock...
592
    self.ls.acquire(None, shared=1)
593
    # ...another thread can share it too
594
    self._addThread(target=self._doLockSet, args=(None, 1))
595
    self._waitThreads()
596
    self.assertEqual(self.done.get_nowait(), 'DONE')
597
    # ...or just share some elements
598
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
599
    self._waitThreads()
600
    self.assertEqual(self.done.get_nowait(), 'DONE')
601
    # ...but not add new ones or remove any
602
    t = self._addThread(target=self._doAddSet, args=(['nine']))
603
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
604
    self.assertRaises(Queue.Empty, self.done.get_nowait)
605
    # this just releases the set-lock
606
    self.ls.release([])
607
    t.join(60)
608
    self.assertEqual(self.done.get_nowait(), 'DONE')
609
    # release the lock on the actual elements so remove() can proceed too
610
    self.ls.release()
611
    self._waitThreads()
612
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
613
    # reset lockset
614
    self._setUpLS()
615

    
616
  @_Repeat
617
  def testConcurrentExclusiveSetLock(self):
618
    # acquire the set-lock...
619
    self.ls.acquire(None, shared=0)
620
    # ...no one can do anything else
621
    self._addThread(target=self._doLockSet, args=(None, 1))
622
    self._addThread(target=self._doLockSet, args=(None, 0))
623
    self._addThread(target=self._doLockSet, args=(['three'], 0))
624
    self._addThread(target=self._doLockSet, args=(['two'], 1))
625
    self._addThread(target=self._doAddSet, args=(['nine']))
626
    self.assertRaises(Queue.Empty, self.done.get_nowait)
627
    self.ls.release()
628
    self._waitThreads()
629
    for _ in range(5):
630
      self.assertEqual(self.done.get(True, 1), 'DONE')
631
    # cleanup
632
    self._setUpLS()
633

    
634
  @_Repeat
635
  def testConcurrentSetLockAdd(self):
636
    self.ls.acquire('one')
637
    # Another thread wants the whole SetLock
638
    self._addThread(target=self._doLockSet, args=(None, 0))
639
    self._addThread(target=self._doLockSet, args=(None, 1))
640
    self.assertRaises(Queue.Empty, self.done.get_nowait)
641
    self.assertRaises(AssertionError, self.ls.add, 'four')
642
    self.ls.release()
643
    self._waitThreads()
644
    self.assertEqual(self.done.get_nowait(), 'DONE')
645
    self.assertEqual(self.done.get_nowait(), 'DONE')
646
    self.ls.acquire(None)
647
    self._addThread(target=self._doLockSet, args=(None, 0))
648
    self._addThread(target=self._doLockSet, args=(None, 1))
649
    self.assertRaises(Queue.Empty, self.done.get_nowait)
650
    self.ls.add('four')
651
    self.ls.add('five', acquired=1)
652
    self.ls.add('six', acquired=1, shared=1)
653
    self.assertEquals(self.ls._list_owned(),
654
      set(['one', 'two', 'three', 'five', 'six']))
655
    self.assertEquals(self.ls._is_owned(), True)
656
    self.assertEquals(self.ls._names(),
657
      set(['one', 'two', 'three', 'four', 'five', 'six']))
658
    self.ls.release()
659
    self._waitThreads()
660
    self.assertEqual(self.done.get_nowait(), 'DONE')
661
    self.assertEqual(self.done.get_nowait(), 'DONE')
662
    self._setUpLS()
663

    
664
  @_Repeat
665
  def testEmptyLockSet(self):
666
    # get the set-lock
667
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
668
    # now empty it...
669
    self.ls.remove(['one', 'two', 'three'])
670
    # and adds/locks by another thread still wait
671
    self._addThread(target=self._doAddSet, args=(['nine']))
672
    self._addThread(target=self._doLockSet, args=(None, 1))
673
    self._addThread(target=self._doLockSet, args=(None, 0))
674
    self.assertRaises(Queue.Empty, self.done.get_nowait)
675
    self.ls.release()
676
    self._waitThreads()
677
    for _ in range(3):
678
      self.assertEqual(self.done.get_nowait(), 'DONE')
679
    # empty it again...
680
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
681
    # now share it...
682
    self.assertEqual(self.ls.acquire(None, shared=1), set())
683
    # other sharers can go, adds still wait
684
    self._addThread(target=self._doLockSet, args=(None, 1))
685
    self._waitThreads()
686
    self.assertEqual(self.done.get_nowait(), 'DONE')
687
    self._addThread(target=self._doAddSet, args=(['nine']))
688
    self.assertRaises(Queue.Empty, self.done.get_nowait)
689
    self.ls.release()
690
    self._waitThreads()
691
    self.assertEqual(self.done.get_nowait(), 'DONE')
692
    self._setUpLS()
693

    
694

    
695
class TestGanetiLockManager(_ThreadedTestCase):
696

    
697
  def setUp(self):
698
    _ThreadedTestCase.setUp(self)
699
    self.nodes=['n1', 'n2']
700
    self.instances=['i1', 'i2', 'i3']
701
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
702
                                        instances=self.instances)
703
    self.done = Queue.Queue(0)
704

    
705
  def tearDown(self):
706
    # Don't try this at home...
707
    locking.GanetiLockManager._instance = None
708

    
709
  def testLockingConstants(self):
710
    # The locking library internally cheats by assuming its constants have some
711
    # relationships with each other. Check those hold true.
712
    # This relationship is also used in the Processor to recursively acquire
713
    # the right locks. Again, please don't break it.
714
    for i in range(len(locking.LEVELS)):
715
      self.assertEqual(i, locking.LEVELS[i])
716

    
717
  def testDoubleGLFails(self):
718
    self.assertRaises(AssertionError, locking.GanetiLockManager)
719

    
720
  def testLockNames(self):
721
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
722
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
723
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
724
                     set(self.instances))
725

    
726
  def testInitAndResources(self):
727
    locking.GanetiLockManager._instance = None
728
    self.GL = locking.GanetiLockManager()
729
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
730
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
731
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
732

    
733
    locking.GanetiLockManager._instance = None
734
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
735
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
736
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
737
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
738

    
739
    locking.GanetiLockManager._instance = None
740
    self.GL = locking.GanetiLockManager(instances=self.instances)
741
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
742
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
743
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
744
                     set(self.instances))
745

    
746
  def testAcquireRelease(self):
747
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
748
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
749
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
750
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
751
    self.GL.release(locking.LEVEL_NODE, ['n2'])
752
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
753
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
754
    self.GL.release(locking.LEVEL_NODE)
755
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
756
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
757
    self.GL.release(locking.LEVEL_INSTANCE)
758
    self.assertRaises(errors.LockError, self.GL.acquire,
759
                      locking.LEVEL_INSTANCE, ['i5'])
760
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
761
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
762

    
763
  def testAcquireWholeSets(self):
764
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
765
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
766
                      set(self.instances))
767
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
768
                      set(self.instances))
769
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
770
                      set(self.nodes))
771
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
772
                      set(self.nodes))
773
    self.GL.release(locking.LEVEL_NODE)
774
    self.GL.release(locking.LEVEL_INSTANCE)
775
    self.GL.release(locking.LEVEL_CLUSTER)
776

    
777
  def testAcquireWholeAndPartial(self):
778
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
779
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
780
                      set(self.instances))
781
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
782
                      set(self.instances))
783
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
784
                      set(['n2']))
785
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
786
                      set(['n2']))
787
    self.GL.release(locking.LEVEL_NODE)
788
    self.GL.release(locking.LEVEL_INSTANCE)
789
    self.GL.release(locking.LEVEL_CLUSTER)
790

    
791
  def testBGLDependency(self):
792
    self.assertRaises(AssertionError, self.GL.acquire,
793
                      locking.LEVEL_NODE, ['n1', 'n2'])
794
    self.assertRaises(AssertionError, self.GL.acquire,
795
                      locking.LEVEL_INSTANCE, ['i3'])
796
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
797
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
798
    self.assertRaises(AssertionError, self.GL.release,
799
                      locking.LEVEL_CLUSTER, ['BGL'])
800
    self.assertRaises(AssertionError, self.GL.release,
801
                      locking.LEVEL_CLUSTER)
802
    self.GL.release(locking.LEVEL_NODE)
803
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
804
    self.assertRaises(AssertionError, self.GL.release,
805
                      locking.LEVEL_CLUSTER, ['BGL'])
806
    self.assertRaises(AssertionError, self.GL.release,
807
                      locking.LEVEL_CLUSTER)
808
    self.GL.release(locking.LEVEL_INSTANCE)
809

    
810
  def testWrongOrder(self):
811
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
812
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
813
    self.assertRaises(AssertionError, self.GL.acquire,
814
                      locking.LEVEL_NODE, ['n1'])
815
    self.assertRaises(AssertionError, self.GL.acquire,
816
                      locking.LEVEL_INSTANCE, ['i2'])
817

    
818
  # Helper function to run as a thread that shared the BGL and then acquires
819
  # some locks at another level.
820
  def _doLock(self, level, names, shared):
821
    try:
822
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
823
      self.GL.acquire(level, names, shared=shared)
824
      self.done.put('DONE')
825
      self.GL.release(level)
826
      self.GL.release(locking.LEVEL_CLUSTER)
827
    except errors.LockError:
828
      self.done.put('ERR')
829

    
830
  @_Repeat
831
  def testConcurrency(self):
832
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
833
    self._addThread(target=self._doLock,
834
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
835
    self._waitThreads()
836
    self.assertEqual(self.done.get_nowait(), 'DONE')
837
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
838
    self._addThread(target=self._doLock,
839
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
840
    self._waitThreads()
841
    self.assertEqual(self.done.get_nowait(), 'DONE')
842
    self._addThread(target=self._doLock,
843
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
844
    self.assertRaises(Queue.Empty, self.done.get_nowait)
845
    self.GL.release(locking.LEVEL_INSTANCE)
846
    self._waitThreads()
847
    self.assertEqual(self.done.get_nowait(), 'DONE')
848
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
849
    self._addThread(target=self._doLock,
850
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
851
    self._waitThreads()
852
    self.assertEqual(self.done.get_nowait(), 'DONE')
853
    self._addThread(target=self._doLock,
854
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
855
    self.assertRaises(Queue.Empty, self.done.get_nowait)
856
    self.GL.release(locking.LEVEL_INSTANCE)
857
    self._waitThreads()
858
    self.assertEqual(self.done.get(True, 1), 'DONE')
859
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
860

    
861

    
862
if __name__ == '__main__':
863
  unittest.main()
864
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
865
  #unittest.TextTestRunner(verbosity=2).run(suite)