Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 84e344d4

History | View | Annotate | Download (37.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
class _ThreadedTestCase(unittest.TestCase):
52
  """Test class that supports adding/waiting on threads"""
53
  def setUp(self):
54
    unittest.TestCase.setUp(self)
55
    self.threads = []
56

    
57
  def _addThread(self, *args, **kwargs):
58
    """Create and remember a new thread"""
59
    t = threading.Thread(*args, **kwargs)
60
    self.threads.append(t)
61
    t.start()
62
    return t
63

    
64
  def _waitThreads(self):
65
    """Wait for all our threads to finish"""
66
    for t in self.threads:
67
      t.join(60)
68
      self.failIf(t.isAlive())
69
    self.threads = []
70

    
71

    
72
class TestSharedLock(_ThreadedTestCase):
73
  """SharedLock tests"""
74

    
75
  def setUp(self):
76
    _ThreadedTestCase.setUp(self)
77
    self.sl = locking.SharedLock()
78
    # helper threads use the 'done' queue to tell the master they finished.
79
    self.done = Queue.Queue(0)
80

    
81
  def testSequenceAndOwnership(self):
82
    self.assert_(not self.sl._is_owned())
83
    self.sl.acquire(shared=1)
84
    self.assert_(self.sl._is_owned())
85
    self.assert_(self.sl._is_owned(shared=1))
86
    self.assert_(not self.sl._is_owned(shared=0))
87
    self.sl.release()
88
    self.assert_(not self.sl._is_owned())
89
    self.sl.acquire()
90
    self.assert_(self.sl._is_owned())
91
    self.assert_(not self.sl._is_owned(shared=1))
92
    self.assert_(self.sl._is_owned(shared=0))
93
    self.sl.release()
94
    self.assert_(not self.sl._is_owned())
95
    self.sl.acquire(shared=1)
96
    self.assert_(self.sl._is_owned())
97
    self.assert_(self.sl._is_owned(shared=1))
98
    self.assert_(not self.sl._is_owned(shared=0))
99
    self.sl.release()
100
    self.assert_(not self.sl._is_owned())
101

    
102
  def testBooleanValue(self):
103
    # semaphores are supposed to return a true value on a successful acquire
104
    self.assert_(self.sl.acquire(shared=1))
105
    self.sl.release()
106
    self.assert_(self.sl.acquire())
107
    self.sl.release()
108

    
109
  def testDoubleLockingStoE(self):
110
    self.sl.acquire(shared=1)
111
    self.assertRaises(AssertionError, self.sl.acquire)
112

    
113
  def testDoubleLockingEtoS(self):
114
    self.sl.acquire()
115
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
116

    
117
  def testDoubleLockingStoS(self):
118
    self.sl.acquire(shared=1)
119
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
120

    
121
  def testDoubleLockingEtoE(self):
122
    self.sl.acquire()
123
    self.assertRaises(AssertionError, self.sl.acquire)
124

    
125
  # helper functions: called in a separate thread they acquire the lock, send
126
  # their identifier on the done queue, then release it.
127
  def _doItSharer(self):
128
    try:
129
      self.sl.acquire(shared=1)
130
      self.done.put('SHR')
131
      self.sl.release()
132
    except errors.LockError:
133
      self.done.put('ERR')
134

    
135
  def _doItExclusive(self):
136
    try:
137
      self.sl.acquire()
138
      self.done.put('EXC')
139
      self.sl.release()
140
    except errors.LockError:
141
      self.done.put('ERR')
142

    
143
  def _doItDelete(self):
144
    try:
145
      self.sl.delete()
146
      self.done.put('DEL')
147
    except errors.LockError:
148
      self.done.put('ERR')
149

    
150
  def testSharersCanCoexist(self):
151
    self.sl.acquire(shared=1)
152
    threading.Thread(target=self._doItSharer).start()
153
    self.assert_(self.done.get(True, 1))
154
    self.sl.release()
155

    
156
  @_Repeat
157
  def testExclusiveBlocksExclusive(self):
158
    self.sl.acquire()
159
    self._addThread(target=self._doItExclusive)
160
    self.assertRaises(Queue.Empty, self.done.get_nowait)
161
    self.sl.release()
162
    self._waitThreads()
163
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
164

    
165
  @_Repeat
166
  def testExclusiveBlocksDelete(self):
167
    self.sl.acquire()
168
    self._addThread(target=self._doItDelete)
169
    self.assertRaises(Queue.Empty, self.done.get_nowait)
170
    self.sl.release()
171
    self._waitThreads()
172
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
173
    self.sl = locking.SharedLock()
174

    
175
  @_Repeat
176
  def testExclusiveBlocksSharer(self):
177
    self.sl.acquire()
178
    self._addThread(target=self._doItSharer)
179
    self.assertRaises(Queue.Empty, self.done.get_nowait)
180
    self.sl.release()
181
    self._waitThreads()
182
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
183

    
184
  @_Repeat
185
  def testSharerBlocksExclusive(self):
186
    self.sl.acquire(shared=1)
187
    self._addThread(target=self._doItExclusive)
188
    self.assertRaises(Queue.Empty, self.done.get_nowait)
189
    self.sl.release()
190
    self._waitThreads()
191
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
192

    
193
  @_Repeat
194
  def testSharerBlocksDelete(self):
195
    self.sl.acquire(shared=1)
196
    self._addThread(target=self._doItDelete)
197
    self.assertRaises(Queue.Empty, self.done.get_nowait)
198
    self.sl.release()
199
    self._waitThreads()
200
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
201
    self.sl = locking.SharedLock()
202

    
203
  @_Repeat
204
  def testWaitingExclusiveBlocksSharer(self):
205
    """SKIPPED testWaitingExclusiveBlockSharer"""
206
    return
207

    
208
    self.sl.acquire(shared=1)
209
    # the lock is acquired in shared mode...
210
    self._addThread(target=self._doItExclusive)
211
    # ...but now an exclusive is waiting...
212
    self._addThread(target=self._doItSharer)
213
    # ...so the sharer should be blocked as well
214
    self.assertRaises(Queue.Empty, self.done.get_nowait)
215
    self.sl.release()
216
    self._waitThreads()
217
    # The exclusive passed before
218
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
219
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
220

    
221
  @_Repeat
222
  def testWaitingSharerBlocksExclusive(self):
223
    """SKIPPED testWaitingSharerBlocksExclusive"""
224
    return
225

    
226
    self.sl.acquire()
227
    # the lock is acquired in exclusive mode...
228
    self._addThread(target=self._doItSharer)
229
    # ...but now a sharer is waiting...
230
    self._addThread(target=self._doItExclusive)
231
    # ...the exclusive is waiting too...
232
    self.assertRaises(Queue.Empty, self.done.get_nowait)
233
    self.sl.release()
234
    self._waitThreads()
235
    # The sharer passed before
236
    self.assertEqual(self.done.get_nowait(), 'SHR')
237
    self.assertEqual(self.done.get_nowait(), 'EXC')
238

    
239
  def testDelete(self):
240
    self.sl.delete()
241
    self.assertRaises(errors.LockError, self.sl.acquire)
242
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
243
    self.assertRaises(errors.LockError, self.sl.delete)
244

    
245
  def testNoDeleteIfSharer(self):
246
    self.sl.acquire(shared=1)
247
    self.assertRaises(AssertionError, self.sl.delete)
248

    
249
  @_Repeat
250
  def testDeletePendingSharersExclusiveDelete(self):
251
    self.sl.acquire()
252
    self._addThread(target=self._doItSharer)
253
    self._addThread(target=self._doItSharer)
254
    self._addThread(target=self._doItExclusive)
255
    self._addThread(target=self._doItDelete)
256
    self.sl.delete()
257
    self._waitThreads()
258
    # The threads who were pending return ERR
259
    for _ in range(4):
260
      self.assertEqual(self.done.get_nowait(), 'ERR')
261
    self.sl = locking.SharedLock()
262

    
263
  @_Repeat
264
  def testDeletePendingDeleteExclusiveSharers(self):
265
    self.sl.acquire()
266
    self._addThread(target=self._doItDelete)
267
    self._addThread(target=self._doItExclusive)
268
    self._addThread(target=self._doItSharer)
269
    self._addThread(target=self._doItSharer)
270
    self.sl.delete()
271
    self._waitThreads()
272
    # The two threads who were pending return both ERR
273
    self.assertEqual(self.done.get_nowait(), 'ERR')
274
    self.assertEqual(self.done.get_nowait(), 'ERR')
275
    self.assertEqual(self.done.get_nowait(), 'ERR')
276
    self.assertEqual(self.done.get_nowait(), 'ERR')
277
    self.sl = locking.SharedLock()
278

    
279
  @_Repeat
280
  def testExclusiveAcquireTimeout(self):
281
    def _LockExclusive(wait):
282
      self.sl.acquire(shared=0)
283
      self.done.put("A: start sleep")
284
      time.sleep(wait)
285
      self.done.put("A: end sleep")
286
      self.sl.release()
287

    
288
    for shared in [0, 1]:
289
      # Start thread to hold lock for 20 ms
290
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
291

    
292
      # Wait up to 100 ms to get lock
293
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
294
      self.done.put("got 2nd")
295
      self.sl.release()
296

    
297
      self._waitThreads()
298

    
299
      self.assertEqual(self.done.get_nowait(), "A: start sleep")
300
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
301
      self.assertEqual(self.done.get_nowait(), "got 2nd")
302
      self.assertRaises(Queue.Empty, self.done.get_nowait)
303

    
304
  @_Repeat
305
  def testAcquireExpiringTimeout(self):
306
    def _AcquireWithTimeout(shared, timeout):
307
      if not self.sl.acquire(shared=shared, timeout=timeout):
308
        self.done.put("timeout")
309

    
310
    for shared in [0, 1]:
311
      # Lock exclusively
312
      self.sl.acquire()
313

    
314
      # Start shared acquires with timeout between 0 and 20 ms
315
      for i in xrange(11):
316
        self._addThread(target=_AcquireWithTimeout,
317
                        args=(shared, i * 2.0 / 1000.0))
318

    
319
      # Wait for threads to finish (makes sure the acquire timeout expires
320
      # before releasing the lock)
321
      self._waitThreads()
322

    
323
      # Release lock
324
      self.sl.release()
325

    
326
      for _ in xrange(11):
327
        self.assertEqual(self.done.get_nowait(), "timeout")
328

    
329
      self.assertRaises(Queue.Empty, self.done.get_nowait)
330

    
331
  @_Repeat
332
  def testSharedSkipExclusiveAcquires(self):
333
    # Tests whether shared acquires jump in front of exclusive acquires in the
334
    # queue.
335

    
336
    # Get exclusive lock while we fill the queue
337
    self.sl.acquire()
338

    
339
    def _Acquire(shared, name):
340
      if not self.sl.acquire(shared=shared):
341
        return
342

    
343
      self.done.put(name)
344
      self.sl.release()
345

    
346
    # Start shared acquires
347
    for _ in xrange(5):
348
      self._addThread(target=_Acquire, args=(1, "shared A"))
349

    
350
    # Start exclusive acquires
351
    for _ in xrange(3):
352
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
353

    
354
    # More shared acquires
355
    for _ in xrange(5):
356
      self._addThread(target=_Acquire, args=(1, "shared C"))
357

    
358
    # More exclusive acquires
359
    for _ in xrange(3):
360
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
361

    
362
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
363
    # together
364
    self.assertEqual(self.sl._count_pending(), 7)
365

    
366
    # Release exclusive lock and wait
367
    self.sl.release()
368

    
369
    self._waitThreads()
370

    
371
    # Check sequence
372
    for _ in xrange(10):
373
      # Shared locks aren't guaranteed to be notified in order, but they'll be
374
      # first
375
      self.assert_(self.done.get_nowait() in ("shared A", "shared C"))
376

    
377
    for _ in xrange(3):
378
      self.assertEqual(self.done.get_nowait(), "exclusive B")
379

    
380
    for _ in xrange(3):
381
      self.assertEqual(self.done.get_nowait(), "exclusive D")
382

    
383
    self.assertRaises(Queue.Empty, self.done.get_nowait)
384

    
385
  @_Repeat
386
  def testMixedAcquireTimeout(self):
387
    sync = threading.Condition()
388

    
389
    def _AcquireShared(ev):
390
      if not self.sl.acquire(shared=1, timeout=None):
391
        return
392

    
393
      self.done.put("shared")
394

    
395
      # Notify main thread
396
      ev.set()
397

    
398
      # Wait for notification
399
      sync.acquire()
400
      try:
401
        sync.wait()
402
      finally:
403
        sync.release()
404

    
405
      # Release lock
406
      self.sl.release()
407

    
408
    acquires = []
409
    for _ in xrange(3):
410
      ev = threading.Event()
411
      self._addThread(target=_AcquireShared, args=(ev, ))
412
      acquires.append(ev)
413

    
414
    # Wait for all acquires to finish
415
    for i in acquires:
416
      i.wait()
417

    
418
    self.assertEqual(self.sl._count_pending(), 0)
419

    
420
    # Try to get exclusive lock
421
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
422

    
423
    # Acquire exclusive without timeout
424
    exclsync = threading.Condition()
425
    exclev = threading.Event()
426

    
427
    def _AcquireExclusive():
428
      if not self.sl.acquire(shared=0):
429
        return
430

    
431
      self.done.put("exclusive")
432

    
433
      # Notify main thread
434
      exclev.set()
435

    
436
      exclsync.acquire()
437
      try:
438
        exclsync.wait()
439
      finally:
440
        exclsync.release()
441

    
442
      self.sl.release()
443

    
444
    self._addThread(target=_AcquireExclusive)
445

    
446
    # Try to get exclusive lock
447
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
448

    
449
    # Make all shared holders release their locks
450
    sync.acquire()
451
    try:
452
      sync.notifyAll()
453
    finally:
454
      sync.release()
455

    
456
    # Wait for exclusive acquire to succeed
457
    exclev.wait()
458

    
459
    self.assertEqual(self.sl._count_pending(), 0)
460

    
461
    # Try to get exclusive lock
462
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
463

    
464
    def _AcquireSharedSimple():
465
      if self.sl.acquire(shared=1, timeout=None):
466
        self.done.put("shared2")
467
        self.sl.release()
468

    
469
    for _ in xrange(10):
470
      self._addThread(target=_AcquireSharedSimple)
471

    
472
    # Tell exclusive lock to release
473
    exclsync.acquire()
474
    try:
475
      exclsync.notifyAll()
476
    finally:
477
      exclsync.release()
478

    
479
    # Wait for everything to finish
480
    self._waitThreads()
481

    
482
    self.assertEqual(self.sl._count_pending(), 0)
483

    
484
    # Check sequence
485
    for _ in xrange(3):
486
      self.assertEqual(self.done.get_nowait(), "shared")
487

    
488
    self.assertEqual(self.done.get_nowait(), "exclusive")
489

    
490
    for _ in xrange(10):
491
      self.assertEqual(self.done.get_nowait(), "shared2")
492

    
493
    self.assertRaises(Queue.Empty, self.done.get_nowait)
494

    
495

    
496
class TestSSynchronizedDecorator(_ThreadedTestCase):
497
  """Shared Lock Synchronized decorator test"""
498

    
499
  def setUp(self):
500
    _ThreadedTestCase.setUp(self)
501
    # helper threads use the 'done' queue to tell the master they finished.
502
    self.done = Queue.Queue(0)
503

    
504
  @locking.ssynchronized(_decoratorlock)
505
  def _doItExclusive(self):
506
    self.assert_(_decoratorlock._is_owned())
507
    self.done.put('EXC')
508

    
509
  @locking.ssynchronized(_decoratorlock, shared=1)
510
  def _doItSharer(self):
511
    self.assert_(_decoratorlock._is_owned(shared=1))
512
    self.done.put('SHR')
513

    
514
  def testDecoratedFunctions(self):
515
    self._doItExclusive()
516
    self.assert_(not _decoratorlock._is_owned())
517
    self._doItSharer()
518
    self.assert_(not _decoratorlock._is_owned())
519

    
520
  def testSharersCanCoexist(self):
521
    _decoratorlock.acquire(shared=1)
522
    threading.Thread(target=self._doItSharer).start()
523
    self.assert_(self.done.get(True, 1))
524
    _decoratorlock.release()
525

    
526
  @_Repeat
527
  def testExclusiveBlocksExclusive(self):
528
    _decoratorlock.acquire()
529
    self._addThread(target=self._doItExclusive)
530
    # give it a bit of time to check that it's not actually doing anything
531
    self.assertRaises(Queue.Empty, self.done.get_nowait)
532
    _decoratorlock.release()
533
    self._waitThreads()
534
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
535

    
536
  @_Repeat
537
  def testExclusiveBlocksSharer(self):
538
    _decoratorlock.acquire()
539
    self._addThread(target=self._doItSharer)
540
    self.assertRaises(Queue.Empty, self.done.get_nowait)
541
    _decoratorlock.release()
542
    self._waitThreads()
543
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
544

    
545
  @_Repeat
546
  def testSharerBlocksExclusive(self):
547
    _decoratorlock.acquire(shared=1)
548
    self._addThread(target=self._doItExclusive)
549
    self.assertRaises(Queue.Empty, self.done.get_nowait)
550
    _decoratorlock.release()
551
    self._waitThreads()
552
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
553

    
554

    
555
class TestLockSet(_ThreadedTestCase):
556
  """LockSet tests"""
557

    
558
  def setUp(self):
559
    _ThreadedTestCase.setUp(self)
560
    self._setUpLS()
561
    # helper threads use the 'done' queue to tell the master they finished.
562
    self.done = Queue.Queue(0)
563

    
564
  def _setUpLS(self):
565
    """Helper to (re)initialize the lock set"""
566
    self.resources = ['one', 'two', 'three']
567
    self.ls = locking.LockSet(members=self.resources)
568

    
569
  def testResources(self):
570
    self.assertEquals(self.ls._names(), set(self.resources))
571
    newls = locking.LockSet()
572
    self.assertEquals(newls._names(), set())
573

    
574
  def testAcquireRelease(self):
575
    self.assert_(self.ls.acquire('one'))
576
    self.assertEquals(self.ls._list_owned(), set(['one']))
577
    self.ls.release()
578
    self.assertEquals(self.ls._list_owned(), set())
579
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
580
    self.assertEquals(self.ls._list_owned(), set(['one']))
581
    self.ls.release()
582
    self.assertEquals(self.ls._list_owned(), set())
583
    self.ls.acquire(['one', 'two', 'three'])
584
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
585
    self.ls.release('one')
586
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
587
    self.ls.release(['three'])
588
    self.assertEquals(self.ls._list_owned(), set(['two']))
589
    self.ls.release()
590
    self.assertEquals(self.ls._list_owned(), set())
591
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
592
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
593
    self.ls.release()
594
    self.assertEquals(self.ls._list_owned(), set())
595

    
596
  def testNoDoubleAcquire(self):
597
    self.ls.acquire('one')
598
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
599
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
600
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
601
    self.ls.release()
602
    self.ls.acquire(['one', 'three'])
603
    self.ls.release('one')
604
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
605
    self.ls.release('three')
606

    
607
  def testNoWrongRelease(self):
608
    self.assertRaises(AssertionError, self.ls.release)
609
    self.ls.acquire('one')
610
    self.assertRaises(AssertionError, self.ls.release, 'two')
611

    
612
  def testAddRemove(self):
613
    self.ls.add('four')
614
    self.assertEquals(self.ls._list_owned(), set())
615
    self.assert_('four' in self.ls._names())
616
    self.ls.add(['five', 'six', 'seven'], acquired=1)
617
    self.assert_('five' in self.ls._names())
618
    self.assert_('six' in self.ls._names())
619
    self.assert_('seven' in self.ls._names())
620
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
621
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
622
    self.assert_('five' not in self.ls._names())
623
    self.assert_('six' not in self.ls._names())
624
    self.assertEquals(self.ls._list_owned(), set(['seven']))
625
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
626
    self.ls.remove('seven')
627
    self.assert_('seven' not in self.ls._names())
628
    self.assertEquals(self.ls._list_owned(), set([]))
629
    self.ls.acquire(None, shared=1)
630
    self.assertRaises(AssertionError, self.ls.add, 'eight')
631
    self.ls.release()
632
    self.ls.acquire(None)
633
    self.ls.add('eight', acquired=1)
634
    self.assert_('eight' in self.ls._names())
635
    self.assert_('eight' in self.ls._list_owned())
636
    self.ls.add('nine')
637
    self.assert_('nine' in self.ls._names())
638
    self.assert_('nine' not in self.ls._list_owned())
639
    self.ls.release()
640
    self.ls.remove(['two'])
641
    self.assert_('two' not in self.ls._names())
642
    self.ls.acquire('three')
643
    self.assertEquals(self.ls.remove(['three']), ['three'])
644
    self.assert_('three' not in self.ls._names())
645
    self.assertEquals(self.ls.remove('three'), [])
646
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
647
    self.assert_('one' not in self.ls._names())
648

    
649
  def testRemoveNonBlocking(self):
650
    self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
651
    self.ls.acquire('one')
652
    self.assertEquals(self.ls.remove('one', blocking=0), ['one'])
653
    self.ls.acquire(['two', 'three'])
654
    self.assertEquals(self.ls.remove(['two', 'three'], blocking=0),
655
                      ['two', 'three'])
656

    
657
  def testNoDoubleAdd(self):
658
    self.assertRaises(errors.LockError, self.ls.add, 'two')
659
    self.ls.add('four')
660
    self.assertRaises(errors.LockError, self.ls.add, 'four')
661

    
662
  def testNoWrongRemoves(self):
663
    self.ls.acquire(['one', 'three'], shared=1)
664
    # Cannot remove 'two' while holding something which is not a superset
665
    self.assertRaises(AssertionError, self.ls.remove, 'two')
666
    # Cannot remove 'three' as we are sharing it
667
    self.assertRaises(AssertionError, self.ls.remove, 'three')
668

    
669
  def testAcquireSetLock(self):
670
    # acquire the set-lock exclusively
671
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
672
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
673
    self.assertEquals(self.ls._is_owned(), True)
674
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
675
    # I can still add/remove elements...
676
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
677
    self.assert_(self.ls.add('six'))
678
    self.ls.release()
679
    # share the set-lock
680
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
681
    # adding new elements is not possible
682
    self.assertRaises(AssertionError, self.ls.add, 'five')
683
    self.ls.release()
684

    
685
  def testAcquireWithRepetitions(self):
686
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
687
                      set(['two', 'two', 'three']))
688
    self.ls.release(['two', 'two'])
689
    self.assertEquals(self.ls._list_owned(), set(['three']))
690

    
691
  def testEmptyAcquire(self):
692
    # Acquire an empty list of locks...
693
    self.assertEquals(self.ls.acquire([]), set())
694
    self.assertEquals(self.ls._list_owned(), set())
695
    # New locks can still be addded
696
    self.assert_(self.ls.add('six'))
697
    # "re-acquiring" is not an issue, since we had really acquired nothing
698
    self.assertEquals(self.ls.acquire([], shared=1), set())
699
    self.assertEquals(self.ls._list_owned(), set())
700
    # We haven't really acquired anything, so we cannot release
701
    self.assertRaises(AssertionError, self.ls.release)
702

    
703
  def _doLockSet(self, names, shared):
704
    try:
705
      self.ls.acquire(names, shared=shared)
706
      self.done.put('DONE')
707
      self.ls.release()
708
    except errors.LockError:
709
      self.done.put('ERR')
710

    
711
  def _doAddSet(self, names):
712
    try:
713
      self.ls.add(names, acquired=1)
714
      self.done.put('DONE')
715
      self.ls.release()
716
    except errors.LockError:
717
      self.done.put('ERR')
718

    
719
  def _doRemoveSet(self, names):
720
    self.done.put(self.ls.remove(names))
721

    
722
  @_Repeat
723
  def testConcurrentSharedAcquire(self):
724
    self.ls.acquire(['one', 'two'], shared=1)
725
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
726
    self._waitThreads()
727
    self.assertEqual(self.done.get_nowait(), 'DONE')
728
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
729
    self._waitThreads()
730
    self.assertEqual(self.done.get_nowait(), 'DONE')
731
    self._addThread(target=self._doLockSet, args=('three', 1))
732
    self._waitThreads()
733
    self.assertEqual(self.done.get_nowait(), 'DONE')
734
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
735
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
736
    self.assertRaises(Queue.Empty, self.done.get_nowait)
737
    self.ls.release()
738
    self._waitThreads()
739
    self.assertEqual(self.done.get_nowait(), 'DONE')
740
    self.assertEqual(self.done.get_nowait(), 'DONE')
741

    
742
  @_Repeat
743
  def testConcurrentExclusiveAcquire(self):
744
    self.ls.acquire(['one', 'two'])
745
    self._addThread(target=self._doLockSet, args=('three', 1))
746
    self._waitThreads()
747
    self.assertEqual(self.done.get_nowait(), 'DONE')
748
    self._addThread(target=self._doLockSet, args=('three', 0))
749
    self._waitThreads()
750
    self.assertEqual(self.done.get_nowait(), 'DONE')
751
    self.assertRaises(Queue.Empty, self.done.get_nowait)
752
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
753
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
754
    self._addThread(target=self._doLockSet, args=('one', 0))
755
    self._addThread(target=self._doLockSet, args=('one', 1))
756
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
757
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
758
    self.assertRaises(Queue.Empty, self.done.get_nowait)
759
    self.ls.release()
760
    self._waitThreads()
761
    for _ in range(6):
762
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
763

    
764
  @_Repeat
765
  def testConcurrentRemove(self):
766
    self.ls.add('four')
767
    self.ls.acquire(['one', 'two', 'four'])
768
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
769
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
770
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
771
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
772
    self.assertRaises(Queue.Empty, self.done.get_nowait)
773
    self.ls.remove('one')
774
    self.ls.release()
775
    self._waitThreads()
776
    for i in range(4):
777
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
778
    self.ls.add(['five', 'six'], acquired=1)
779
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
780
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
781
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
782
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
783
    self.ls.remove('five')
784
    self.ls.release()
785
    self._waitThreads()
786
    for i in range(4):
787
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
788
    self.ls.acquire(['three', 'four'])
789
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
790
    self.assertRaises(Queue.Empty, self.done.get_nowait)
791
    self.ls.remove('four')
792
    self._waitThreads()
793
    self.assertEqual(self.done.get_nowait(), ['six'])
794
    self._addThread(target=self._doRemoveSet, args=(['two']))
795
    self._waitThreads()
796
    self.assertEqual(self.done.get_nowait(), ['two'])
797
    self.ls.release()
798
    # reset lockset
799
    self._setUpLS()
800

    
801
  @_Repeat
802
  def testConcurrentSharedSetLock(self):
803
    # share the set-lock...
804
    self.ls.acquire(None, shared=1)
805
    # ...another thread can share it too
806
    self._addThread(target=self._doLockSet, args=(None, 1))
807
    self._waitThreads()
808
    self.assertEqual(self.done.get_nowait(), 'DONE')
809
    # ...or just share some elements
810
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
811
    self._waitThreads()
812
    self.assertEqual(self.done.get_nowait(), 'DONE')
813
    # ...but not add new ones or remove any
814
    t = self._addThread(target=self._doAddSet, args=(['nine']))
815
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
816
    self.assertRaises(Queue.Empty, self.done.get_nowait)
817
    # this just releases the set-lock
818
    self.ls.release([])
819
    t.join(60)
820
    self.assertEqual(self.done.get_nowait(), 'DONE')
821
    # release the lock on the actual elements so remove() can proceed too
822
    self.ls.release()
823
    self._waitThreads()
824
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
825
    # reset lockset
826
    self._setUpLS()
827

    
828
  @_Repeat
829
  def testConcurrentExclusiveSetLock(self):
830
    # acquire the set-lock...
831
    self.ls.acquire(None, shared=0)
832
    # ...no one can do anything else
833
    self._addThread(target=self._doLockSet, args=(None, 1))
834
    self._addThread(target=self._doLockSet, args=(None, 0))
835
    self._addThread(target=self._doLockSet, args=(['three'], 0))
836
    self._addThread(target=self._doLockSet, args=(['two'], 1))
837
    self._addThread(target=self._doAddSet, args=(['nine']))
838
    self.assertRaises(Queue.Empty, self.done.get_nowait)
839
    self.ls.release()
840
    self._waitThreads()
841
    for _ in range(5):
842
      self.assertEqual(self.done.get(True, 1), 'DONE')
843
    # cleanup
844
    self._setUpLS()
845

    
846
  @_Repeat
847
  def testConcurrentSetLockAdd(self):
848
    self.ls.acquire('one')
849
    # Another thread wants the whole SetLock
850
    self._addThread(target=self._doLockSet, args=(None, 0))
851
    self._addThread(target=self._doLockSet, args=(None, 1))
852
    self.assertRaises(Queue.Empty, self.done.get_nowait)
853
    self.assertRaises(AssertionError, self.ls.add, 'four')
854
    self.ls.release()
855
    self._waitThreads()
856
    self.assertEqual(self.done.get_nowait(), 'DONE')
857
    self.assertEqual(self.done.get_nowait(), 'DONE')
858
    self.ls.acquire(None)
859
    self._addThread(target=self._doLockSet, args=(None, 0))
860
    self._addThread(target=self._doLockSet, args=(None, 1))
861
    self.assertRaises(Queue.Empty, self.done.get_nowait)
862
    self.ls.add('four')
863
    self.ls.add('five', acquired=1)
864
    self.ls.add('six', acquired=1, shared=1)
865
    self.assertEquals(self.ls._list_owned(),
866
      set(['one', 'two', 'three', 'five', 'six']))
867
    self.assertEquals(self.ls._is_owned(), True)
868
    self.assertEquals(self.ls._names(),
869
      set(['one', 'two', 'three', 'four', 'five', 'six']))
870
    self.ls.release()
871
    self._waitThreads()
872
    self.assertEqual(self.done.get_nowait(), 'DONE')
873
    self.assertEqual(self.done.get_nowait(), 'DONE')
874
    self._setUpLS()
875

    
876
  @_Repeat
877
  def testEmptyLockSet(self):
878
    # get the set-lock
879
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
880
    # now empty it...
881
    self.ls.remove(['one', 'two', 'three'])
882
    # and adds/locks by another thread still wait
883
    self._addThread(target=self._doAddSet, args=(['nine']))
884
    self._addThread(target=self._doLockSet, args=(None, 1))
885
    self._addThread(target=self._doLockSet, args=(None, 0))
886
    self.assertRaises(Queue.Empty, self.done.get_nowait)
887
    self.ls.release()
888
    self._waitThreads()
889
    for _ in range(3):
890
      self.assertEqual(self.done.get_nowait(), 'DONE')
891
    # empty it again...
892
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
893
    # now share it...
894
    self.assertEqual(self.ls.acquire(None, shared=1), set())
895
    # other sharers can go, adds still wait
896
    self._addThread(target=self._doLockSet, args=(None, 1))
897
    self._waitThreads()
898
    self.assertEqual(self.done.get_nowait(), 'DONE')
899
    self._addThread(target=self._doAddSet, args=(['nine']))
900
    self.assertRaises(Queue.Empty, self.done.get_nowait)
901
    self.ls.release()
902
    self._waitThreads()
903
    self.assertEqual(self.done.get_nowait(), 'DONE')
904
    self._setUpLS()
905

    
906

    
907
class TestGanetiLockManager(_ThreadedTestCase):
908

    
909
  def setUp(self):
910
    _ThreadedTestCase.setUp(self)
911
    self.nodes=['n1', 'n2']
912
    self.instances=['i1', 'i2', 'i3']
913
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
914
                                        instances=self.instances)
915
    self.done = Queue.Queue(0)
916

    
917
  def tearDown(self):
918
    # Don't try this at home...
919
    locking.GanetiLockManager._instance = None
920

    
921
  def testLockingConstants(self):
922
    # The locking library internally cheats by assuming its constants have some
923
    # relationships with each other. Check those hold true.
924
    # This relationship is also used in the Processor to recursively acquire
925
    # the right locks. Again, please don't break it.
926
    for i in range(len(locking.LEVELS)):
927
      self.assertEqual(i, locking.LEVELS[i])
928

    
929
  def testDoubleGLFails(self):
930
    self.assertRaises(AssertionError, locking.GanetiLockManager)
931

    
932
  def testLockNames(self):
933
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
934
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
935
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
936
                     set(self.instances))
937

    
938
  def testInitAndResources(self):
939
    locking.GanetiLockManager._instance = None
940
    self.GL = locking.GanetiLockManager()
941
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
942
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
943
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
944

    
945
    locking.GanetiLockManager._instance = None
946
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
947
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
948
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
949
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
950

    
951
    locking.GanetiLockManager._instance = None
952
    self.GL = locking.GanetiLockManager(instances=self.instances)
953
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
954
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
955
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
956
                     set(self.instances))
957

    
958
  def testAcquireRelease(self):
959
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
960
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
961
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
962
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
963
    self.GL.release(locking.LEVEL_NODE, ['n2'])
964
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
965
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
966
    self.GL.release(locking.LEVEL_NODE)
967
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
968
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
969
    self.GL.release(locking.LEVEL_INSTANCE)
970
    self.assertRaises(errors.LockError, self.GL.acquire,
971
                      locking.LEVEL_INSTANCE, ['i5'])
972
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
973
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
974

    
975
  def testAcquireWholeSets(self):
976
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
977
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
978
                      set(self.instances))
979
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
980
                      set(self.instances))
981
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
982
                      set(self.nodes))
983
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
984
                      set(self.nodes))
985
    self.GL.release(locking.LEVEL_NODE)
986
    self.GL.release(locking.LEVEL_INSTANCE)
987
    self.GL.release(locking.LEVEL_CLUSTER)
988

    
989
  def testAcquireWholeAndPartial(self):
990
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
991
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
992
                      set(self.instances))
993
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
994
                      set(self.instances))
995
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
996
                      set(['n2']))
997
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
998
                      set(['n2']))
999
    self.GL.release(locking.LEVEL_NODE)
1000
    self.GL.release(locking.LEVEL_INSTANCE)
1001
    self.GL.release(locking.LEVEL_CLUSTER)
1002

    
1003
  def testBGLDependency(self):
1004
    self.assertRaises(AssertionError, self.GL.acquire,
1005
                      locking.LEVEL_NODE, ['n1', 'n2'])
1006
    self.assertRaises(AssertionError, self.GL.acquire,
1007
                      locking.LEVEL_INSTANCE, ['i3'])
1008
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1009
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1010
    self.assertRaises(AssertionError, self.GL.release,
1011
                      locking.LEVEL_CLUSTER, ['BGL'])
1012
    self.assertRaises(AssertionError, self.GL.release,
1013
                      locking.LEVEL_CLUSTER)
1014
    self.GL.release(locking.LEVEL_NODE)
1015
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1016
    self.assertRaises(AssertionError, self.GL.release,
1017
                      locking.LEVEL_CLUSTER, ['BGL'])
1018
    self.assertRaises(AssertionError, self.GL.release,
1019
                      locking.LEVEL_CLUSTER)
1020
    self.GL.release(locking.LEVEL_INSTANCE)
1021

    
1022
  def testWrongOrder(self):
1023
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1024
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1025
    self.assertRaises(AssertionError, self.GL.acquire,
1026
                      locking.LEVEL_NODE, ['n1'])
1027
    self.assertRaises(AssertionError, self.GL.acquire,
1028
                      locking.LEVEL_INSTANCE, ['i2'])
1029

    
1030
  # Helper function to run as a thread that shared the BGL and then acquires
1031
  # some locks at another level.
1032
  def _doLock(self, level, names, shared):
1033
    try:
1034
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1035
      self.GL.acquire(level, names, shared=shared)
1036
      self.done.put('DONE')
1037
      self.GL.release(level)
1038
      self.GL.release(locking.LEVEL_CLUSTER)
1039
    except errors.LockError:
1040
      self.done.put('ERR')
1041

    
1042
  @_Repeat
1043
  def testConcurrency(self):
1044
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1045
    self._addThread(target=self._doLock,
1046
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1047
    self._waitThreads()
1048
    self.assertEqual(self.done.get_nowait(), 'DONE')
1049
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1050
    self._addThread(target=self._doLock,
1051
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1052
    self._waitThreads()
1053
    self.assertEqual(self.done.get_nowait(), 'DONE')
1054
    self._addThread(target=self._doLock,
1055
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1056
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1057
    self.GL.release(locking.LEVEL_INSTANCE)
1058
    self._waitThreads()
1059
    self.assertEqual(self.done.get_nowait(), 'DONE')
1060
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1061
    self._addThread(target=self._doLock,
1062
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1063
    self._waitThreads()
1064
    self.assertEqual(self.done.get_nowait(), 'DONE')
1065
    self._addThread(target=self._doLock,
1066
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1067
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1068
    self.GL.release(locking.LEVEL_INSTANCE)
1069
    self._waitThreads()
1070
    self.assertEqual(self.done.get(True, 1), 'DONE')
1071
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1072

    
1073

    
1074
if __name__ == '__main__':
1075
  unittest.main()
1076
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1077
  #unittest.TextTestRunner(verbosity=2).run(suite)