Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 008b92fa

History | View | Annotate | Download (42.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
class _ThreadedTestCase(unittest.TestCase):
52
  """Test class that supports adding/waiting on threads"""
53
  def setUp(self):
54
    unittest.TestCase.setUp(self)
55
    self.done = Queue.Queue(0)
56
    self.threads = []
57

    
58
  def _addThread(self, *args, **kwargs):
59
    """Create and remember a new thread"""
60
    t = threading.Thread(*args, **kwargs)
61
    self.threads.append(t)
62
    t.start()
63
    return t
64

    
65
  def _waitThreads(self):
66
    """Wait for all our threads to finish"""
67
    for t in self.threads:
68
      t.join(60)
69
      self.failIf(t.isAlive())
70
    self.threads = []
71

    
72

    
73
class _ConditionTestCase(_ThreadedTestCase):
74
  """Common test case for conditions"""
75

    
76
  def setUp(self, cls):
77
    _ThreadedTestCase.setUp(self)
78
    self.lock = threading.Lock()
79
    self.cond = cls(self.lock)
80

    
81
  def _testAcquireRelease(self):
82
    self.assert_(not self.cond._is_owned())
83
    self.assertRaises(RuntimeError, self.cond.wait)
84
    self.assertRaises(RuntimeError, self.cond.notifyAll)
85

    
86
    self.cond.acquire()
87
    self.assert_(self.cond._is_owned())
88
    self.cond.notifyAll()
89
    self.assert_(self.cond._is_owned())
90
    self.cond.release()
91

    
92
    self.assert_(not self.cond._is_owned())
93
    self.assertRaises(RuntimeError, self.cond.wait)
94
    self.assertRaises(RuntimeError, self.cond.notifyAll)
95

    
96
  def _testNotification(self):
97
    def _NotifyAll():
98
      self.done.put("NE")
99
      self.cond.acquire()
100
      self.done.put("NA")
101
      self.cond.notifyAll()
102
      self.done.put("NN")
103
      self.cond.release()
104

    
105
    self.cond.acquire()
106
    self._addThread(target=_NotifyAll)
107
    self.assertEqual(self.done.get(True, 1), "NE")
108
    self.assertRaises(Queue.Empty, self.done.get_nowait)
109
    self.cond.wait()
110
    self.assertEqual(self.done.get(True, 1), "NA")
111
    self.assertEqual(self.done.get(True, 1), "NN")
112
    self.assert_(self.cond._is_owned())
113
    self.cond.release()
114
    self.assert_(not self.cond._is_owned())
115

    
116

    
117
class TestSingleNotifyPipeCondition(_ConditionTestCase):
118
  """SingleNotifyPipeCondition tests"""
119

    
120
  def setUp(self):
121
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
122

    
123
  def testAcquireRelease(self):
124
    self._testAcquireRelease()
125

    
126
  def testNotification(self):
127
    self._testNotification()
128

    
129
  def testWaitReuse(self):
130
    self.cond.acquire()
131
    self.cond.wait(0)
132
    self.cond.wait(0.1)
133
    self.cond.release()
134

    
135
  def testNoNotifyReuse(self):
136
    self.cond.acquire()
137
    self.cond.notifyAll()
138
    self.assertRaises(RuntimeError, self.cond.wait)
139
    self.assertRaises(RuntimeError, self.cond.notifyAll)
140
    self.cond.release()
141

    
142

    
143
class TestPipeCondition(_ConditionTestCase):
144
  """PipeCondition tests"""
145

    
146
  def setUp(self):
147
    _ConditionTestCase.setUp(self, locking.PipeCondition)
148

    
149
  def testAcquireRelease(self):
150
    self._testAcquireRelease()
151

    
152
  def testNotification(self):
153
    self._testNotification()
154

    
155
  def _TestWait(self, fn):
156
    self._addThread(target=fn)
157
    self._addThread(target=fn)
158
    self._addThread(target=fn)
159

    
160
    # Wait for threads to be waiting
161
    self.assertEqual(self.done.get(True, 1), "A")
162
    self.assertEqual(self.done.get(True, 1), "A")
163
    self.assertEqual(self.done.get(True, 1), "A")
164

    
165
    self.assertRaises(Queue.Empty, self.done.get_nowait)
166

    
167
    self.cond.acquire()
168
    self.assertEqual(self.cond._nwaiters, 3)
169
    # This new thread can"t acquire the lock, and thus call wait, before we
170
    # release it
171
    self._addThread(target=fn)
172
    self.cond.notifyAll()
173
    self.assertRaises(Queue.Empty, self.done.get_nowait)
174
    self.cond.release()
175

    
176
    # We should now get 3 W and 1 A (for the new thread) in whatever order
177
    w = 0
178
    a = 0
179
    for i in range(4):
180
      got = self.done.get(True, 1)
181
      if got == "W":
182
        w += 1
183
      elif got == "A":
184
        a += 1
185
      else:
186
        self.fail("Got %s on the done queue" % got)
187

    
188
    self.assertEqual(w, 3)
189
    self.assertEqual(a, 1)
190

    
191
    self.cond.acquire()
192
    self.cond.notifyAll()
193
    self.cond.release()
194
    self._waitThreads()
195
    self.assertEqual(self.done.get_nowait(), "W")
196
    self.assertRaises(Queue.Empty, self.done.get_nowait)
197

    
198
  def testBlockingWait(self):
199
    def _BlockingWait():
200
      self.cond.acquire()
201
      self.done.put("A")
202
      self.cond.wait()
203
      self.cond.release()
204
      self.done.put("W")
205

    
206
    self._TestWait(_BlockingWait)
207

    
208
  def testLongTimeoutWait(self):
209
    def _Helper():
210
      self.cond.acquire()
211
      self.done.put("A")
212
      self.cond.wait(15.0)
213
      self.cond.release()
214
      self.done.put("W")
215

    
216
    self._TestWait(_Helper)
217

    
218
  def _TimeoutWait(self, timeout, check):
219
    self.cond.acquire()
220
    self.cond.wait(timeout)
221
    self.cond.release()
222
    self.done.put(check)
223

    
224
  def testShortTimeoutWait(self):
225
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
226
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
227
    self._waitThreads()
228
    self.assertEqual(self.done.get_nowait(), "T1")
229
    self.assertEqual(self.done.get_nowait(), "T1")
230
    self.assertRaises(Queue.Empty, self.done.get_nowait)
231

    
232
  def testZeroTimeoutWait(self):
233
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
234
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
235
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
236
    self._waitThreads()
237
    self.assertEqual(self.done.get_nowait(), "T0")
238
    self.assertEqual(self.done.get_nowait(), "T0")
239
    self.assertEqual(self.done.get_nowait(), "T0")
240
    self.assertRaises(Queue.Empty, self.done.get_nowait)
241

    
242

    
243
class TestSharedLock(_ThreadedTestCase):
244
  """SharedLock tests"""
245

    
246
  def setUp(self):
247
    _ThreadedTestCase.setUp(self)
248
    self.sl = locking.SharedLock()
249

    
250
  def testSequenceAndOwnership(self):
251
    self.assert_(not self.sl._is_owned())
252
    self.sl.acquire(shared=1)
253
    self.assert_(self.sl._is_owned())
254
    self.assert_(self.sl._is_owned(shared=1))
255
    self.assert_(not self.sl._is_owned(shared=0))
256
    self.sl.release()
257
    self.assert_(not self.sl._is_owned())
258
    self.sl.acquire()
259
    self.assert_(self.sl._is_owned())
260
    self.assert_(not self.sl._is_owned(shared=1))
261
    self.assert_(self.sl._is_owned(shared=0))
262
    self.sl.release()
263
    self.assert_(not self.sl._is_owned())
264
    self.sl.acquire(shared=1)
265
    self.assert_(self.sl._is_owned())
266
    self.assert_(self.sl._is_owned(shared=1))
267
    self.assert_(not self.sl._is_owned(shared=0))
268
    self.sl.release()
269
    self.assert_(not self.sl._is_owned())
270

    
271
  def testBooleanValue(self):
272
    # semaphores are supposed to return a true value on a successful acquire
273
    self.assert_(self.sl.acquire(shared=1))
274
    self.sl.release()
275
    self.assert_(self.sl.acquire())
276
    self.sl.release()
277

    
278
  def testDoubleLockingStoE(self):
279
    self.sl.acquire(shared=1)
280
    self.assertRaises(AssertionError, self.sl.acquire)
281

    
282
  def testDoubleLockingEtoS(self):
283
    self.sl.acquire()
284
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
285

    
286
  def testDoubleLockingStoS(self):
287
    self.sl.acquire(shared=1)
288
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
289

    
290
  def testDoubleLockingEtoE(self):
291
    self.sl.acquire()
292
    self.assertRaises(AssertionError, self.sl.acquire)
293

    
294
  # helper functions: called in a separate thread they acquire the lock, send
295
  # their identifier on the done queue, then release it.
296
  def _doItSharer(self):
297
    try:
298
      self.sl.acquire(shared=1)
299
      self.done.put('SHR')
300
      self.sl.release()
301
    except errors.LockError:
302
      self.done.put('ERR')
303

    
304
  def _doItExclusive(self):
305
    try:
306
      self.sl.acquire()
307
      self.done.put('EXC')
308
      self.sl.release()
309
    except errors.LockError:
310
      self.done.put('ERR')
311

    
312
  def _doItDelete(self):
313
    try:
314
      self.sl.delete()
315
      self.done.put('DEL')
316
    except errors.LockError:
317
      self.done.put('ERR')
318

    
319
  def testSharersCanCoexist(self):
320
    self.sl.acquire(shared=1)
321
    threading.Thread(target=self._doItSharer).start()
322
    self.assert_(self.done.get(True, 1))
323
    self.sl.release()
324

    
325
  @_Repeat
326
  def testExclusiveBlocksExclusive(self):
327
    self.sl.acquire()
328
    self._addThread(target=self._doItExclusive)
329
    self.assertRaises(Queue.Empty, self.done.get_nowait)
330
    self.sl.release()
331
    self._waitThreads()
332
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
333

    
334
  @_Repeat
335
  def testExclusiveBlocksDelete(self):
336
    self.sl.acquire()
337
    self._addThread(target=self._doItDelete)
338
    self.assertRaises(Queue.Empty, self.done.get_nowait)
339
    self.sl.release()
340
    self._waitThreads()
341
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
342
    self.sl = locking.SharedLock()
343

    
344
  @_Repeat
345
  def testExclusiveBlocksSharer(self):
346
    self.sl.acquire()
347
    self._addThread(target=self._doItSharer)
348
    self.assertRaises(Queue.Empty, self.done.get_nowait)
349
    self.sl.release()
350
    self._waitThreads()
351
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
352

    
353
  @_Repeat
354
  def testSharerBlocksExclusive(self):
355
    self.sl.acquire(shared=1)
356
    self._addThread(target=self._doItExclusive)
357
    self.assertRaises(Queue.Empty, self.done.get_nowait)
358
    self.sl.release()
359
    self._waitThreads()
360
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
361

    
362
  @_Repeat
363
  def testSharerBlocksDelete(self):
364
    self.sl.acquire(shared=1)
365
    self._addThread(target=self._doItDelete)
366
    self.assertRaises(Queue.Empty, self.done.get_nowait)
367
    self.sl.release()
368
    self._waitThreads()
369
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
370
    self.sl = locking.SharedLock()
371

    
372
  @_Repeat
373
  def testWaitingExclusiveBlocksSharer(self):
374
    """SKIPPED testWaitingExclusiveBlockSharer"""
375
    return
376

    
377
    self.sl.acquire(shared=1)
378
    # the lock is acquired in shared mode...
379
    self._addThread(target=self._doItExclusive)
380
    # ...but now an exclusive is waiting...
381
    self._addThread(target=self._doItSharer)
382
    # ...so the sharer should be blocked as well
383
    self.assertRaises(Queue.Empty, self.done.get_nowait)
384
    self.sl.release()
385
    self._waitThreads()
386
    # The exclusive passed before
387
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
388
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
389

    
390
  @_Repeat
391
  def testWaitingSharerBlocksExclusive(self):
392
    """SKIPPED testWaitingSharerBlocksExclusive"""
393
    return
394

    
395
    self.sl.acquire()
396
    # the lock is acquired in exclusive mode...
397
    self._addThread(target=self._doItSharer)
398
    # ...but now a sharer is waiting...
399
    self._addThread(target=self._doItExclusive)
400
    # ...the exclusive is waiting too...
401
    self.assertRaises(Queue.Empty, self.done.get_nowait)
402
    self.sl.release()
403
    self._waitThreads()
404
    # The sharer passed before
405
    self.assertEqual(self.done.get_nowait(), 'SHR')
406
    self.assertEqual(self.done.get_nowait(), 'EXC')
407

    
408
  def testDelete(self):
409
    self.sl.delete()
410
    self.assertRaises(errors.LockError, self.sl.acquire)
411
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
412
    self.assertRaises(errors.LockError, self.sl.delete)
413

    
414
  def testDeleteTimeout(self):
415
    self.sl.delete(timeout=60)
416

    
417
  def testNoDeleteIfSharer(self):
418
    self.sl.acquire(shared=1)
419
    self.assertRaises(AssertionError, self.sl.delete)
420

    
421
  @_Repeat
422
  def testDeletePendingSharersExclusiveDelete(self):
423
    self.sl.acquire()
424
    self._addThread(target=self._doItSharer)
425
    self._addThread(target=self._doItSharer)
426
    self._addThread(target=self._doItExclusive)
427
    self._addThread(target=self._doItDelete)
428
    self.sl.delete()
429
    self._waitThreads()
430
    # The threads who were pending return ERR
431
    for _ in range(4):
432
      self.assertEqual(self.done.get_nowait(), 'ERR')
433
    self.sl = locking.SharedLock()
434

    
435
  @_Repeat
436
  def testDeletePendingDeleteExclusiveSharers(self):
437
    self.sl.acquire()
438
    self._addThread(target=self._doItDelete)
439
    self._addThread(target=self._doItExclusive)
440
    self._addThread(target=self._doItSharer)
441
    self._addThread(target=self._doItSharer)
442
    self.sl.delete()
443
    self._waitThreads()
444
    # The two threads who were pending return both ERR
445
    self.assertEqual(self.done.get_nowait(), 'ERR')
446
    self.assertEqual(self.done.get_nowait(), 'ERR')
447
    self.assertEqual(self.done.get_nowait(), 'ERR')
448
    self.assertEqual(self.done.get_nowait(), 'ERR')
449
    self.sl = locking.SharedLock()
450

    
451
  @_Repeat
452
  def testExclusiveAcquireTimeout(self):
453
    for shared in [0, 1]:
454
      on_queue = threading.Event()
455
      release_exclusive = threading.Event()
456

    
457
      def _LockExclusive():
458
        self.sl.acquire(shared=0, test_notify=on_queue.set)
459
        self.done.put("A: start wait")
460
        release_exclusive.wait()
461
        self.done.put("A: end wait")
462
        self.sl.release()
463

    
464
      # Start thread to hold lock in exclusive mode
465
      self._addThread(target=_LockExclusive)
466

    
467
      # Wait for wait to begin
468
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
469

    
470
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
471
      # on the queue
472
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
473
                                      test_notify=release_exclusive.set))
474

    
475
      self.done.put("got 2nd")
476
      self.sl.release()
477

    
478
      self._waitThreads()
479

    
480
      self.assertEqual(self.done.get_nowait(), "A: end wait")
481
      self.assertEqual(self.done.get_nowait(), "got 2nd")
482
      self.assertRaises(Queue.Empty, self.done.get_nowait)
483

    
484
  @_Repeat
485
  def testAcquireExpiringTimeout(self):
486
    def _AcquireWithTimeout(shared, timeout):
487
      if not self.sl.acquire(shared=shared, timeout=timeout):
488
        self.done.put("timeout")
489

    
490
    for shared in [0, 1]:
491
      # Lock exclusively
492
      self.sl.acquire()
493

    
494
      # Start shared acquires with timeout between 0 and 20 ms
495
      for i in xrange(11):
496
        self._addThread(target=_AcquireWithTimeout,
497
                        args=(shared, i * 2.0 / 1000.0))
498

    
499
      # Wait for threads to finish (makes sure the acquire timeout expires
500
      # before releasing the lock)
501
      self._waitThreads()
502

    
503
      # Release lock
504
      self.sl.release()
505

    
506
      for _ in xrange(11):
507
        self.assertEqual(self.done.get_nowait(), "timeout")
508

    
509
      self.assertRaises(Queue.Empty, self.done.get_nowait)
510

    
511
  @_Repeat
512
  def testSharedSkipExclusiveAcquires(self):
513
    # Tests whether shared acquires jump in front of exclusive acquires in the
514
    # queue.
515

    
516
    def _Acquire(shared, name, notify_ev, wait_ev):
517
      if notify_ev:
518
        notify_fn = notify_ev.set
519
      else:
520
        notify_fn = None
521

    
522
      if wait_ev:
523
        wait_ev.wait()
524

    
525
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
526
        return
527

    
528
      self.done.put(name)
529
      self.sl.release()
530

    
531
    # Get exclusive lock while we fill the queue
532
    self.sl.acquire()
533

    
534
    shrcnt1 = 5
535
    shrcnt2 = 7
536
    shrcnt3 = 9
537
    shrcnt4 = 2
538

    
539
    # Add acquires using threading.Event for synchronization. They'll be
540
    # acquired exactly in the order defined in this list.
541
    acquires = (shrcnt1 * [(1, "shared 1")] +
542
                3 * [(0, "exclusive 1")] +
543
                shrcnt2 * [(1, "shared 2")] +
544
                shrcnt3 * [(1, "shared 3")] +
545
                shrcnt4 * [(1, "shared 4")] +
546
                3 * [(0, "exclusive 2")])
547

    
548
    ev_cur = None
549
    ev_prev = None
550

    
551
    for args in acquires:
552
      ev_cur = threading.Event()
553
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
554
      ev_prev = ev_cur
555

    
556
    # Wait for last acquire to start
557
    ev_prev.wait()
558

    
559
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
560
    # together
561
    self.assertEqual(self.sl._count_pending(), 7)
562

    
563
    # Release exclusive lock and wait
564
    self.sl.release()
565

    
566
    self._waitThreads()
567

    
568
    # Check sequence
569
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
570
      # Shared locks aren't guaranteed to be notified in order, but they'll be
571
      # first
572
      tmp = self.done.get_nowait()
573
      if tmp == "shared 1":
574
        shrcnt1 -= 1
575
      elif tmp == "shared 2":
576
        shrcnt2 -= 1
577
      elif tmp == "shared 3":
578
        shrcnt3 -= 1
579
      elif tmp == "shared 4":
580
        shrcnt4 -= 1
581
    self.assertEqual(shrcnt1, 0)
582
    self.assertEqual(shrcnt2, 0)
583
    self.assertEqual(shrcnt3, 0)
584
    self.assertEqual(shrcnt3, 0)
585

    
586
    for _ in xrange(3):
587
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
588

    
589
    for _ in xrange(3):
590
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
591

    
592
    self.assertRaises(Queue.Empty, self.done.get_nowait)
593

    
594
  @_Repeat
595
  def testMixedAcquireTimeout(self):
596
    sync = threading.Condition()
597

    
598
    def _AcquireShared(ev):
599
      if not self.sl.acquire(shared=1, timeout=None):
600
        return
601

    
602
      self.done.put("shared")
603

    
604
      # Notify main thread
605
      ev.set()
606

    
607
      # Wait for notification
608
      sync.acquire()
609
      try:
610
        sync.wait()
611
      finally:
612
        sync.release()
613

    
614
      # Release lock
615
      self.sl.release()
616

    
617
    acquires = []
618
    for _ in xrange(3):
619
      ev = threading.Event()
620
      self._addThread(target=_AcquireShared, args=(ev, ))
621
      acquires.append(ev)
622

    
623
    # Wait for all acquires to finish
624
    for i in acquires:
625
      i.wait()
626

    
627
    self.assertEqual(self.sl._count_pending(), 0)
628

    
629
    # Try to get exclusive lock
630
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
631

    
632
    # Acquire exclusive without timeout
633
    exclsync = threading.Condition()
634
    exclev = threading.Event()
635

    
636
    def _AcquireExclusive():
637
      if not self.sl.acquire(shared=0):
638
        return
639

    
640
      self.done.put("exclusive")
641

    
642
      # Notify main thread
643
      exclev.set()
644

    
645
      exclsync.acquire()
646
      try:
647
        exclsync.wait()
648
      finally:
649
        exclsync.release()
650

    
651
      self.sl.release()
652

    
653
    self._addThread(target=_AcquireExclusive)
654

    
655
    # Try to get exclusive lock
656
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
657

    
658
    # Make all shared holders release their locks
659
    sync.acquire()
660
    try:
661
      sync.notifyAll()
662
    finally:
663
      sync.release()
664

    
665
    # Wait for exclusive acquire to succeed
666
    exclev.wait()
667

    
668
    self.assertEqual(self.sl._count_pending(), 0)
669

    
670
    # Try to get exclusive lock
671
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
672

    
673
    def _AcquireSharedSimple():
674
      if self.sl.acquire(shared=1, timeout=None):
675
        self.done.put("shared2")
676
        self.sl.release()
677

    
678
    for _ in xrange(10):
679
      self._addThread(target=_AcquireSharedSimple)
680

    
681
    # Tell exclusive lock to release
682
    exclsync.acquire()
683
    try:
684
      exclsync.notifyAll()
685
    finally:
686
      exclsync.release()
687

    
688
    # Wait for everything to finish
689
    self._waitThreads()
690

    
691
    self.assertEqual(self.sl._count_pending(), 0)
692

    
693
    # Check sequence
694
    for _ in xrange(3):
695
      self.assertEqual(self.done.get_nowait(), "shared")
696

    
697
    self.assertEqual(self.done.get_nowait(), "exclusive")
698

    
699
    for _ in xrange(10):
700
      self.assertEqual(self.done.get_nowait(), "shared2")
701

    
702
    self.assertRaises(Queue.Empty, self.done.get_nowait)
703

    
704

    
705
class TestSSynchronizedDecorator(_ThreadedTestCase):
706
  """Shared Lock Synchronized decorator test"""
707

    
708
  def setUp(self):
709
    _ThreadedTestCase.setUp(self)
710

    
711
  @locking.ssynchronized(_decoratorlock)
712
  def _doItExclusive(self):
713
    self.assert_(_decoratorlock._is_owned())
714
    self.done.put('EXC')
715

    
716
  @locking.ssynchronized(_decoratorlock, shared=1)
717
  def _doItSharer(self):
718
    self.assert_(_decoratorlock._is_owned(shared=1))
719
    self.done.put('SHR')
720

    
721
  def testDecoratedFunctions(self):
722
    self._doItExclusive()
723
    self.assert_(not _decoratorlock._is_owned())
724
    self._doItSharer()
725
    self.assert_(not _decoratorlock._is_owned())
726

    
727
  def testSharersCanCoexist(self):
728
    _decoratorlock.acquire(shared=1)
729
    threading.Thread(target=self._doItSharer).start()
730
    self.assert_(self.done.get(True, 1))
731
    _decoratorlock.release()
732

    
733
  @_Repeat
734
  def testExclusiveBlocksExclusive(self):
735
    _decoratorlock.acquire()
736
    self._addThread(target=self._doItExclusive)
737
    # give it a bit of time to check that it's not actually doing anything
738
    self.assertRaises(Queue.Empty, self.done.get_nowait)
739
    _decoratorlock.release()
740
    self._waitThreads()
741
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
742

    
743
  @_Repeat
744
  def testExclusiveBlocksSharer(self):
745
    _decoratorlock.acquire()
746
    self._addThread(target=self._doItSharer)
747
    self.assertRaises(Queue.Empty, self.done.get_nowait)
748
    _decoratorlock.release()
749
    self._waitThreads()
750
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
751

    
752
  @_Repeat
753
  def testSharerBlocksExclusive(self):
754
    _decoratorlock.acquire(shared=1)
755
    self._addThread(target=self._doItExclusive)
756
    self.assertRaises(Queue.Empty, self.done.get_nowait)
757
    _decoratorlock.release()
758
    self._waitThreads()
759
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
760

    
761

    
762
class TestLockSet(_ThreadedTestCase):
763
  """LockSet tests"""
764

    
765
  def setUp(self):
766
    _ThreadedTestCase.setUp(self)
767
    self._setUpLS()
768

    
769
  def _setUpLS(self):
770
    """Helper to (re)initialize the lock set"""
771
    self.resources = ['one', 'two', 'three']
772
    self.ls = locking.LockSet(members=self.resources)
773

    
774
  def testResources(self):
775
    self.assertEquals(self.ls._names(), set(self.resources))
776
    newls = locking.LockSet()
777
    self.assertEquals(newls._names(), set())
778

    
779
  def testAcquireRelease(self):
780
    self.assert_(self.ls.acquire('one'))
781
    self.assertEquals(self.ls._list_owned(), set(['one']))
782
    self.ls.release()
783
    self.assertEquals(self.ls._list_owned(), set())
784
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
785
    self.assertEquals(self.ls._list_owned(), set(['one']))
786
    self.ls.release()
787
    self.assertEquals(self.ls._list_owned(), set())
788
    self.ls.acquire(['one', 'two', 'three'])
789
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
790
    self.ls.release('one')
791
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
792
    self.ls.release(['three'])
793
    self.assertEquals(self.ls._list_owned(), set(['two']))
794
    self.ls.release()
795
    self.assertEquals(self.ls._list_owned(), set())
796
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
797
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
798
    self.ls.release()
799
    self.assertEquals(self.ls._list_owned(), set())
800

    
801
  def testNoDoubleAcquire(self):
802
    self.ls.acquire('one')
803
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
804
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
805
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
806
    self.ls.release()
807
    self.ls.acquire(['one', 'three'])
808
    self.ls.release('one')
809
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
810
    self.ls.release('three')
811

    
812
  def testNoWrongRelease(self):
813
    self.assertRaises(AssertionError, self.ls.release)
814
    self.ls.acquire('one')
815
    self.assertRaises(AssertionError, self.ls.release, 'two')
816

    
817
  def testAddRemove(self):
818
    self.ls.add('four')
819
    self.assertEquals(self.ls._list_owned(), set())
820
    self.assert_('four' in self.ls._names())
821
    self.ls.add(['five', 'six', 'seven'], acquired=1)
822
    self.assert_('five' in self.ls._names())
823
    self.assert_('six' in self.ls._names())
824
    self.assert_('seven' in self.ls._names())
825
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
826
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
827
    self.assert_('five' not in self.ls._names())
828
    self.assert_('six' not in self.ls._names())
829
    self.assertEquals(self.ls._list_owned(), set(['seven']))
830
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
831
    self.ls.remove('seven')
832
    self.assert_('seven' not in self.ls._names())
833
    self.assertEquals(self.ls._list_owned(), set([]))
834
    self.ls.acquire(None, shared=1)
835
    self.assertRaises(AssertionError, self.ls.add, 'eight')
836
    self.ls.release()
837
    self.ls.acquire(None)
838
    self.ls.add('eight', acquired=1)
839
    self.assert_('eight' in self.ls._names())
840
    self.assert_('eight' in self.ls._list_owned())
841
    self.ls.add('nine')
842
    self.assert_('nine' in self.ls._names())
843
    self.assert_('nine' not in self.ls._list_owned())
844
    self.ls.release()
845
    self.ls.remove(['two'])
846
    self.assert_('two' not in self.ls._names())
847
    self.ls.acquire('three')
848
    self.assertEquals(self.ls.remove(['three']), ['three'])
849
    self.assert_('three' not in self.ls._names())
850
    self.assertEquals(self.ls.remove('three'), [])
851
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
852
    self.assert_('one' not in self.ls._names())
853

    
854
  def testRemoveNonBlocking(self):
855
    self.ls.acquire('one')
856
    self.assertEquals(self.ls.remove('one'), ['one'])
857
    self.ls.acquire(['two', 'three'])
858
    self.assertEquals(self.ls.remove(['two', 'three']),
859
                      ['two', 'three'])
860

    
861
  def testNoDoubleAdd(self):
862
    self.assertRaises(errors.LockError, self.ls.add, 'two')
863
    self.ls.add('four')
864
    self.assertRaises(errors.LockError, self.ls.add, 'four')
865

    
866
  def testNoWrongRemoves(self):
867
    self.ls.acquire(['one', 'three'], shared=1)
868
    # Cannot remove 'two' while holding something which is not a superset
869
    self.assertRaises(AssertionError, self.ls.remove, 'two')
870
    # Cannot remove 'three' as we are sharing it
871
    self.assertRaises(AssertionError, self.ls.remove, 'three')
872

    
873
  def testAcquireSetLock(self):
874
    # acquire the set-lock exclusively
875
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
876
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
877
    self.assertEquals(self.ls._is_owned(), True)
878
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
879
    # I can still add/remove elements...
880
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
881
    self.assert_(self.ls.add('six'))
882
    self.ls.release()
883
    # share the set-lock
884
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
885
    # adding new elements is not possible
886
    self.assertRaises(AssertionError, self.ls.add, 'five')
887
    self.ls.release()
888

    
889
  def testAcquireWithRepetitions(self):
890
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
891
                      set(['two', 'two', 'three']))
892
    self.ls.release(['two', 'two'])
893
    self.assertEquals(self.ls._list_owned(), set(['three']))
894

    
895
  def testEmptyAcquire(self):
896
    # Acquire an empty list of locks...
897
    self.assertEquals(self.ls.acquire([]), set())
898
    self.assertEquals(self.ls._list_owned(), set())
899
    # New locks can still be addded
900
    self.assert_(self.ls.add('six'))
901
    # "re-acquiring" is not an issue, since we had really acquired nothing
902
    self.assertEquals(self.ls.acquire([], shared=1), set())
903
    self.assertEquals(self.ls._list_owned(), set())
904
    # We haven't really acquired anything, so we cannot release
905
    self.assertRaises(AssertionError, self.ls.release)
906

    
907
  def _doLockSet(self, names, shared):
908
    try:
909
      self.ls.acquire(names, shared=shared)
910
      self.done.put('DONE')
911
      self.ls.release()
912
    except errors.LockError:
913
      self.done.put('ERR')
914

    
915
  def _doAddSet(self, names):
916
    try:
917
      self.ls.add(names, acquired=1)
918
      self.done.put('DONE')
919
      self.ls.release()
920
    except errors.LockError:
921
      self.done.put('ERR')
922

    
923
  def _doRemoveSet(self, names):
924
    self.done.put(self.ls.remove(names))
925

    
926
  @_Repeat
927
  def testConcurrentSharedAcquire(self):
928
    self.ls.acquire(['one', 'two'], shared=1)
929
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
930
    self._waitThreads()
931
    self.assertEqual(self.done.get_nowait(), 'DONE')
932
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
933
    self._waitThreads()
934
    self.assertEqual(self.done.get_nowait(), 'DONE')
935
    self._addThread(target=self._doLockSet, args=('three', 1))
936
    self._waitThreads()
937
    self.assertEqual(self.done.get_nowait(), 'DONE')
938
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
939
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
940
    self.assertRaises(Queue.Empty, self.done.get_nowait)
941
    self.ls.release()
942
    self._waitThreads()
943
    self.assertEqual(self.done.get_nowait(), 'DONE')
944
    self.assertEqual(self.done.get_nowait(), 'DONE')
945

    
946
  @_Repeat
947
  def testConcurrentExclusiveAcquire(self):
948
    self.ls.acquire(['one', 'two'])
949
    self._addThread(target=self._doLockSet, args=('three', 1))
950
    self._waitThreads()
951
    self.assertEqual(self.done.get_nowait(), 'DONE')
952
    self._addThread(target=self._doLockSet, args=('three', 0))
953
    self._waitThreads()
954
    self.assertEqual(self.done.get_nowait(), 'DONE')
955
    self.assertRaises(Queue.Empty, self.done.get_nowait)
956
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
957
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
958
    self._addThread(target=self._doLockSet, args=('one', 0))
959
    self._addThread(target=self._doLockSet, args=('one', 1))
960
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
961
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
962
    self.assertRaises(Queue.Empty, self.done.get_nowait)
963
    self.ls.release()
964
    self._waitThreads()
965
    for _ in range(6):
966
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
967

    
968
  @_Repeat
969
  def testConcurrentRemove(self):
970
    self.ls.add('four')
971
    self.ls.acquire(['one', 'two', 'four'])
972
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
973
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
974
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
975
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
976
    self.assertRaises(Queue.Empty, self.done.get_nowait)
977
    self.ls.remove('one')
978
    self.ls.release()
979
    self._waitThreads()
980
    for i in range(4):
981
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
982
    self.ls.add(['five', 'six'], acquired=1)
983
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
984
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
985
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
986
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
987
    self.ls.remove('five')
988
    self.ls.release()
989
    self._waitThreads()
990
    for i in range(4):
991
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
992
    self.ls.acquire(['three', 'four'])
993
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
994
    self.assertRaises(Queue.Empty, self.done.get_nowait)
995
    self.ls.remove('four')
996
    self._waitThreads()
997
    self.assertEqual(self.done.get_nowait(), ['six'])
998
    self._addThread(target=self._doRemoveSet, args=(['two']))
999
    self._waitThreads()
1000
    self.assertEqual(self.done.get_nowait(), ['two'])
1001
    self.ls.release()
1002
    # reset lockset
1003
    self._setUpLS()
1004

    
1005
  @_Repeat
1006
  def testConcurrentSharedSetLock(self):
1007
    # share the set-lock...
1008
    self.ls.acquire(None, shared=1)
1009
    # ...another thread can share it too
1010
    self._addThread(target=self._doLockSet, args=(None, 1))
1011
    self._waitThreads()
1012
    self.assertEqual(self.done.get_nowait(), 'DONE')
1013
    # ...or just share some elements
1014
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1015
    self._waitThreads()
1016
    self.assertEqual(self.done.get_nowait(), 'DONE')
1017
    # ...but not add new ones or remove any
1018
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1019
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1020
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1021
    # this just releases the set-lock
1022
    self.ls.release([])
1023
    t.join(60)
1024
    self.assertEqual(self.done.get_nowait(), 'DONE')
1025
    # release the lock on the actual elements so remove() can proceed too
1026
    self.ls.release()
1027
    self._waitThreads()
1028
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1029
    # reset lockset
1030
    self._setUpLS()
1031

    
1032
  @_Repeat
1033
  def testConcurrentExclusiveSetLock(self):
1034
    # acquire the set-lock...
1035
    self.ls.acquire(None, shared=0)
1036
    # ...no one can do anything else
1037
    self._addThread(target=self._doLockSet, args=(None, 1))
1038
    self._addThread(target=self._doLockSet, args=(None, 0))
1039
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1040
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1041
    self._addThread(target=self._doAddSet, args=(['nine']))
1042
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1043
    self.ls.release()
1044
    self._waitThreads()
1045
    for _ in range(5):
1046
      self.assertEqual(self.done.get(True, 1), 'DONE')
1047
    # cleanup
1048
    self._setUpLS()
1049

    
1050
  @_Repeat
1051
  def testConcurrentSetLockAdd(self):
1052
    self.ls.acquire('one')
1053
    # Another thread wants the whole SetLock
1054
    self._addThread(target=self._doLockSet, args=(None, 0))
1055
    self._addThread(target=self._doLockSet, args=(None, 1))
1056
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1057
    self.assertRaises(AssertionError, self.ls.add, 'four')
1058
    self.ls.release()
1059
    self._waitThreads()
1060
    self.assertEqual(self.done.get_nowait(), 'DONE')
1061
    self.assertEqual(self.done.get_nowait(), 'DONE')
1062
    self.ls.acquire(None)
1063
    self._addThread(target=self._doLockSet, args=(None, 0))
1064
    self._addThread(target=self._doLockSet, args=(None, 1))
1065
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1066
    self.ls.add('four')
1067
    self.ls.add('five', acquired=1)
1068
    self.ls.add('six', acquired=1, shared=1)
1069
    self.assertEquals(self.ls._list_owned(),
1070
      set(['one', 'two', 'three', 'five', 'six']))
1071
    self.assertEquals(self.ls._is_owned(), True)
1072
    self.assertEquals(self.ls._names(),
1073
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1074
    self.ls.release()
1075
    self._waitThreads()
1076
    self.assertEqual(self.done.get_nowait(), 'DONE')
1077
    self.assertEqual(self.done.get_nowait(), 'DONE')
1078
    self._setUpLS()
1079

    
1080
  @_Repeat
1081
  def testEmptyLockSet(self):
1082
    # get the set-lock
1083
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1084
    # now empty it...
1085
    self.ls.remove(['one', 'two', 'three'])
1086
    # and adds/locks by another thread still wait
1087
    self._addThread(target=self._doAddSet, args=(['nine']))
1088
    self._addThread(target=self._doLockSet, args=(None, 1))
1089
    self._addThread(target=self._doLockSet, args=(None, 0))
1090
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1091
    self.ls.release()
1092
    self._waitThreads()
1093
    for _ in range(3):
1094
      self.assertEqual(self.done.get_nowait(), 'DONE')
1095
    # empty it again...
1096
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1097
    # now share it...
1098
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1099
    # other sharers can go, adds still wait
1100
    self._addThread(target=self._doLockSet, args=(None, 1))
1101
    self._waitThreads()
1102
    self.assertEqual(self.done.get_nowait(), 'DONE')
1103
    self._addThread(target=self._doAddSet, args=(['nine']))
1104
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1105
    self.ls.release()
1106
    self._waitThreads()
1107
    self.assertEqual(self.done.get_nowait(), 'DONE')
1108
    self._setUpLS()
1109

    
1110

    
1111
class TestGanetiLockManager(_ThreadedTestCase):
1112

    
1113
  def setUp(self):
1114
    _ThreadedTestCase.setUp(self)
1115
    self.nodes=['n1', 'n2']
1116
    self.instances=['i1', 'i2', 'i3']
1117
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1118
                                        instances=self.instances)
1119

    
1120
  def tearDown(self):
1121
    # Don't try this at home...
1122
    locking.GanetiLockManager._instance = None
1123

    
1124
  def testLockingConstants(self):
1125
    # The locking library internally cheats by assuming its constants have some
1126
    # relationships with each other. Check those hold true.
1127
    # This relationship is also used in the Processor to recursively acquire
1128
    # the right locks. Again, please don't break it.
1129
    for i in range(len(locking.LEVELS)):
1130
      self.assertEqual(i, locking.LEVELS[i])
1131

    
1132
  def testDoubleGLFails(self):
1133
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1134

    
1135
  def testLockNames(self):
1136
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1137
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1138
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1139
                     set(self.instances))
1140

    
1141
  def testInitAndResources(self):
1142
    locking.GanetiLockManager._instance = None
1143
    self.GL = locking.GanetiLockManager()
1144
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1145
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1146
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1147

    
1148
    locking.GanetiLockManager._instance = None
1149
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1150
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1151
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1152
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1153

    
1154
    locking.GanetiLockManager._instance = None
1155
    self.GL = locking.GanetiLockManager(instances=self.instances)
1156
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1157
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1158
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1159
                     set(self.instances))
1160

    
1161
  def testAcquireRelease(self):
1162
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1163
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1164
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1165
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1166
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1167
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1168
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1169
    self.GL.release(locking.LEVEL_NODE)
1170
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1171
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1172
    self.GL.release(locking.LEVEL_INSTANCE)
1173
    self.assertRaises(errors.LockError, self.GL.acquire,
1174
                      locking.LEVEL_INSTANCE, ['i5'])
1175
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1176
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1177

    
1178
  def testAcquireWholeSets(self):
1179
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1180
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1181
                      set(self.instances))
1182
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1183
                      set(self.instances))
1184
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1185
                      set(self.nodes))
1186
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1187
                      set(self.nodes))
1188
    self.GL.release(locking.LEVEL_NODE)
1189
    self.GL.release(locking.LEVEL_INSTANCE)
1190
    self.GL.release(locking.LEVEL_CLUSTER)
1191

    
1192
  def testAcquireWholeAndPartial(self):
1193
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1194
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1195
                      set(self.instances))
1196
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1197
                      set(self.instances))
1198
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1199
                      set(['n2']))
1200
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1201
                      set(['n2']))
1202
    self.GL.release(locking.LEVEL_NODE)
1203
    self.GL.release(locking.LEVEL_INSTANCE)
1204
    self.GL.release(locking.LEVEL_CLUSTER)
1205

    
1206
  def testBGLDependency(self):
1207
    self.assertRaises(AssertionError, self.GL.acquire,
1208
                      locking.LEVEL_NODE, ['n1', 'n2'])
1209
    self.assertRaises(AssertionError, self.GL.acquire,
1210
                      locking.LEVEL_INSTANCE, ['i3'])
1211
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1212
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1213
    self.assertRaises(AssertionError, self.GL.release,
1214
                      locking.LEVEL_CLUSTER, ['BGL'])
1215
    self.assertRaises(AssertionError, self.GL.release,
1216
                      locking.LEVEL_CLUSTER)
1217
    self.GL.release(locking.LEVEL_NODE)
1218
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1219
    self.assertRaises(AssertionError, self.GL.release,
1220
                      locking.LEVEL_CLUSTER, ['BGL'])
1221
    self.assertRaises(AssertionError, self.GL.release,
1222
                      locking.LEVEL_CLUSTER)
1223
    self.GL.release(locking.LEVEL_INSTANCE)
1224

    
1225
  def testWrongOrder(self):
1226
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1227
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1228
    self.assertRaises(AssertionError, self.GL.acquire,
1229
                      locking.LEVEL_NODE, ['n1'])
1230
    self.assertRaises(AssertionError, self.GL.acquire,
1231
                      locking.LEVEL_INSTANCE, ['i2'])
1232

    
1233
  # Helper function to run as a thread that shared the BGL and then acquires
1234
  # some locks at another level.
1235
  def _doLock(self, level, names, shared):
1236
    try:
1237
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1238
      self.GL.acquire(level, names, shared=shared)
1239
      self.done.put('DONE')
1240
      self.GL.release(level)
1241
      self.GL.release(locking.LEVEL_CLUSTER)
1242
    except errors.LockError:
1243
      self.done.put('ERR')
1244

    
1245
  @_Repeat
1246
  def testConcurrency(self):
1247
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1248
    self._addThread(target=self._doLock,
1249
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1250
    self._waitThreads()
1251
    self.assertEqual(self.done.get_nowait(), 'DONE')
1252
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1253
    self._addThread(target=self._doLock,
1254
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1255
    self._waitThreads()
1256
    self.assertEqual(self.done.get_nowait(), 'DONE')
1257
    self._addThread(target=self._doLock,
1258
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1259
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1260
    self.GL.release(locking.LEVEL_INSTANCE)
1261
    self._waitThreads()
1262
    self.assertEqual(self.done.get_nowait(), 'DONE')
1263
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1264
    self._addThread(target=self._doLock,
1265
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1266
    self._waitThreads()
1267
    self.assertEqual(self.done.get_nowait(), 'DONE')
1268
    self._addThread(target=self._doLock,
1269
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1270
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1271
    self.GL.release(locking.LEVEL_INSTANCE)
1272
    self._waitThreads()
1273
    self.assertEqual(self.done.get(True, 1), 'DONE')
1274
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1275

    
1276

    
1277
if __name__ == '__main__':
1278
  unittest.main()
1279
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1280
  #unittest.TextTestRunner(verbosity=2).run(suite)