Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 34cb5617

History | View | Annotate | Download (42.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
class _ThreadedTestCase(unittest.TestCase):
52
  """Test class that supports adding/waiting on threads"""
53
  def setUp(self):
54
    unittest.TestCase.setUp(self)
55
    self.done = Queue.Queue(0)
56
    self.threads = []
57

    
58
  def _addThread(self, *args, **kwargs):
59
    """Create and remember a new thread"""
60
    t = threading.Thread(*args, **kwargs)
61
    self.threads.append(t)
62
    t.start()
63
    return t
64

    
65
  def _waitThreads(self):
66
    """Wait for all our threads to finish"""
67
    for t in self.threads:
68
      t.join(60)
69
      self.failIf(t.isAlive())
70
    self.threads = []
71

    
72

    
73
class _ConditionTestCase(_ThreadedTestCase):
74
  """Common test case for conditions"""
75

    
76
  def setUp(self, cls):
77
    _ThreadedTestCase.setUp(self)
78
    self.lock = threading.Lock()
79
    self.cond = cls(self.lock)
80

    
81
  def _testAcquireRelease(self):
82
    self.assert_(not self.cond._is_owned())
83
    self.assertRaises(RuntimeError, self.cond.wait)
84
    self.assertRaises(RuntimeError, self.cond.notifyAll)
85

    
86
    self.cond.acquire()
87
    self.assert_(self.cond._is_owned())
88
    self.cond.notifyAll()
89
    self.assert_(self.cond._is_owned())
90
    self.cond.release()
91

    
92
    self.assert_(not self.cond._is_owned())
93
    self.assertRaises(RuntimeError, self.cond.wait)
94
    self.assertRaises(RuntimeError, self.cond.notifyAll)
95

    
96
  def _testNotification(self):
97
    def _NotifyAll():
98
      self.done.put("NE")
99
      self.cond.acquire()
100
      self.done.put("NA")
101
      self.cond.notifyAll()
102
      self.done.put("NN")
103
      self.cond.release()
104

    
105
    self.cond.acquire()
106
    self._addThread(target=_NotifyAll)
107
    self.assertEqual(self.done.get(True, 1), "NE")
108
    self.assertRaises(Queue.Empty, self.done.get_nowait)
109
    self.cond.wait()
110
    self.assertEqual(self.done.get(True, 1), "NA")
111
    self.assertEqual(self.done.get(True, 1), "NN")
112
    self.assert_(self.cond._is_owned())
113
    self.cond.release()
114
    self.assert_(not self.cond._is_owned())
115

    
116

    
117
class TestSingleNotifyPipeCondition(_ConditionTestCase):
118
  """SingleNotifyPipeCondition tests"""
119

    
120
  def setUp(self):
121
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
122

    
123
  def testAcquireRelease(self):
124
    self._testAcquireRelease()
125

    
126
  def testNotification(self):
127
    self._testNotification()
128

    
129
  def testWaitReuse(self):
130
    self.cond.acquire()
131
    self.cond.wait(0)
132
    self.cond.wait(0.1)
133
    self.cond.release()
134

    
135
  def testNoNotifyReuse(self):
136
    self.cond.acquire()
137
    self.cond.notifyAll()
138
    self.assertRaises(RuntimeError, self.cond.wait)
139
    self.assertRaises(RuntimeError, self.cond.notifyAll)
140
    self.cond.release()
141

    
142

    
143
class TestPipeCondition(_ConditionTestCase):
144
  """PipeCondition tests"""
145

    
146
  def setUp(self):
147
    _ConditionTestCase.setUp(self, locking.PipeCondition)
148

    
149
  def testAcquireRelease(self):
150
    self._testAcquireRelease()
151

    
152
  def testNotification(self):
153
    self._testNotification()
154

    
155
  def _TestWait(self, fn):
156
    self._addThread(target=fn)
157
    self._addThread(target=fn)
158
    self._addThread(target=fn)
159

    
160
    # Wait for threads to be waiting
161
    self.assertEqual(self.done.get(True, 1), "A")
162
    self.assertEqual(self.done.get(True, 1), "A")
163
    self.assertEqual(self.done.get(True, 1), "A")
164

    
165
    self.assertRaises(Queue.Empty, self.done.get_nowait)
166

    
167
    self.cond.acquire()
168
    self.assertEqual(self.cond._nwaiters, 3)
169
    # This new thread can"t acquire the lock, and thus call wait, before we
170
    # release it
171
    self._addThread(target=fn)
172
    self.cond.notifyAll()
173
    self.assertRaises(Queue.Empty, self.done.get_nowait)
174
    self.cond.release()
175

    
176
    # We should now get 3 W and 1 A (for the new thread) in whatever order
177
    w = 0
178
    a = 0
179
    for i in range(4):
180
      got = self.done.get(True, 1)
181
      if got == "W":
182
        w += 1
183
      elif got == "A":
184
        a += 1
185
      else:
186
        self.fail("Got %s on the done queue" % got)
187

    
188
    self.assertEqual(w, 3)
189
    self.assertEqual(a, 1)
190

    
191
    self.cond.acquire()
192
    self.cond.notifyAll()
193
    self.cond.release()
194
    self._waitThreads()
195
    self.assertEqual(self.done.get_nowait(), "W")
196
    self.assertRaises(Queue.Empty, self.done.get_nowait)
197

    
198
  def testBlockingWait(self):
199
    def _BlockingWait():
200
      self.cond.acquire()
201
      self.done.put("A")
202
      self.cond.wait()
203
      self.cond.release()
204
      self.done.put("W")
205

    
206
    self._TestWait(_BlockingWait)
207

    
208
  def testLongTimeoutWait(self):
209
    def _Helper():
210
      self.cond.acquire()
211
      self.done.put("A")
212
      self.cond.wait(15.0)
213
      self.cond.release()
214
      self.done.put("W")
215

    
216
    self._TestWait(_Helper)
217

    
218
  def _TimeoutWait(self, timeout, check):
219
    self.cond.acquire()
220
    self.cond.wait(timeout)
221
    self.cond.release()
222
    self.done.put(check)
223

    
224
  def testShortTimeoutWait(self):
225
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
226
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
227
    self._waitThreads()
228
    self.assertEqual(self.done.get_nowait(), "T1")
229
    self.assertEqual(self.done.get_nowait(), "T1")
230
    self.assertRaises(Queue.Empty, self.done.get_nowait)
231

    
232
  def testZeroTimeoutWait(self):
233
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
234
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
235
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
236
    self._waitThreads()
237
    self.assertEqual(self.done.get_nowait(), "T0")
238
    self.assertEqual(self.done.get_nowait(), "T0")
239
    self.assertEqual(self.done.get_nowait(), "T0")
240
    self.assertRaises(Queue.Empty, self.done.get_nowait)
241

    
242

    
243
class TestSharedLock(_ThreadedTestCase):
244
  """SharedLock tests"""
245

    
246
  def setUp(self):
247
    _ThreadedTestCase.setUp(self)
248
    self.sl = locking.SharedLock()
249

    
250
  def testSequenceAndOwnership(self):
251
    self.assert_(not self.sl._is_owned())
252
    self.sl.acquire(shared=1)
253
    self.assert_(self.sl._is_owned())
254
    self.assert_(self.sl._is_owned(shared=1))
255
    self.assert_(not self.sl._is_owned(shared=0))
256
    self.sl.release()
257
    self.assert_(not self.sl._is_owned())
258
    self.sl.acquire()
259
    self.assert_(self.sl._is_owned())
260
    self.assert_(not self.sl._is_owned(shared=1))
261
    self.assert_(self.sl._is_owned(shared=0))
262
    self.sl.release()
263
    self.assert_(not self.sl._is_owned())
264
    self.sl.acquire(shared=1)
265
    self.assert_(self.sl._is_owned())
266
    self.assert_(self.sl._is_owned(shared=1))
267
    self.assert_(not self.sl._is_owned(shared=0))
268
    self.sl.release()
269
    self.assert_(not self.sl._is_owned())
270

    
271
  def testBooleanValue(self):
272
    # semaphores are supposed to return a true value on a successful acquire
273
    self.assert_(self.sl.acquire(shared=1))
274
    self.sl.release()
275
    self.assert_(self.sl.acquire())
276
    self.sl.release()
277

    
278
  def testDoubleLockingStoE(self):
279
    self.sl.acquire(shared=1)
280
    self.assertRaises(AssertionError, self.sl.acquire)
281

    
282
  def testDoubleLockingEtoS(self):
283
    self.sl.acquire()
284
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
285

    
286
  def testDoubleLockingStoS(self):
287
    self.sl.acquire(shared=1)
288
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
289

    
290
  def testDoubleLockingEtoE(self):
291
    self.sl.acquire()
292
    self.assertRaises(AssertionError, self.sl.acquire)
293

    
294
  # helper functions: called in a separate thread they acquire the lock, send
295
  # their identifier on the done queue, then release it.
296
  def _doItSharer(self):
297
    try:
298
      self.sl.acquire(shared=1)
299
      self.done.put('SHR')
300
      self.sl.release()
301
    except errors.LockError:
302
      self.done.put('ERR')
303

    
304
  def _doItExclusive(self):
305
    try:
306
      self.sl.acquire()
307
      self.done.put('EXC')
308
      self.sl.release()
309
    except errors.LockError:
310
      self.done.put('ERR')
311

    
312
  def _doItDelete(self):
313
    try:
314
      self.sl.delete()
315
      self.done.put('DEL')
316
    except errors.LockError:
317
      self.done.put('ERR')
318

    
319
  def testSharersCanCoexist(self):
320
    self.sl.acquire(shared=1)
321
    threading.Thread(target=self._doItSharer).start()
322
    self.assert_(self.done.get(True, 1))
323
    self.sl.release()
324

    
325
  @_Repeat
326
  def testExclusiveBlocksExclusive(self):
327
    self.sl.acquire()
328
    self._addThread(target=self._doItExclusive)
329
    self.assertRaises(Queue.Empty, self.done.get_nowait)
330
    self.sl.release()
331
    self._waitThreads()
332
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
333

    
334
  @_Repeat
335
  def testExclusiveBlocksDelete(self):
336
    self.sl.acquire()
337
    self._addThread(target=self._doItDelete)
338
    self.assertRaises(Queue.Empty, self.done.get_nowait)
339
    self.sl.release()
340
    self._waitThreads()
341
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
342
    self.sl = locking.SharedLock()
343

    
344
  @_Repeat
345
  def testExclusiveBlocksSharer(self):
346
    self.sl.acquire()
347
    self._addThread(target=self._doItSharer)
348
    self.assertRaises(Queue.Empty, self.done.get_nowait)
349
    self.sl.release()
350
    self._waitThreads()
351
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
352

    
353
  @_Repeat
354
  def testSharerBlocksExclusive(self):
355
    self.sl.acquire(shared=1)
356
    self._addThread(target=self._doItExclusive)
357
    self.assertRaises(Queue.Empty, self.done.get_nowait)
358
    self.sl.release()
359
    self._waitThreads()
360
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
361

    
362
  @_Repeat
363
  def testSharerBlocksDelete(self):
364
    self.sl.acquire(shared=1)
365
    self._addThread(target=self._doItDelete)
366
    self.assertRaises(Queue.Empty, self.done.get_nowait)
367
    self.sl.release()
368
    self._waitThreads()
369
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
370
    self.sl = locking.SharedLock()
371

    
372
  @_Repeat
373
  def testWaitingExclusiveBlocksSharer(self):
374
    """SKIPPED testWaitingExclusiveBlockSharer"""
375
    return
376

    
377
    self.sl.acquire(shared=1)
378
    # the lock is acquired in shared mode...
379
    self._addThread(target=self._doItExclusive)
380
    # ...but now an exclusive is waiting...
381
    self._addThread(target=self._doItSharer)
382
    # ...so the sharer should be blocked as well
383
    self.assertRaises(Queue.Empty, self.done.get_nowait)
384
    self.sl.release()
385
    self._waitThreads()
386
    # The exclusive passed before
387
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
388
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
389

    
390
  @_Repeat
391
  def testWaitingSharerBlocksExclusive(self):
392
    """SKIPPED testWaitingSharerBlocksExclusive"""
393
    return
394

    
395
    self.sl.acquire()
396
    # the lock is acquired in exclusive mode...
397
    self._addThread(target=self._doItSharer)
398
    # ...but now a sharer is waiting...
399
    self._addThread(target=self._doItExclusive)
400
    # ...the exclusive is waiting too...
401
    self.assertRaises(Queue.Empty, self.done.get_nowait)
402
    self.sl.release()
403
    self._waitThreads()
404
    # The sharer passed before
405
    self.assertEqual(self.done.get_nowait(), 'SHR')
406
    self.assertEqual(self.done.get_nowait(), 'EXC')
407

    
408
  def testDelete(self):
409
    self.sl.delete()
410
    self.assertRaises(errors.LockError, self.sl.acquire)
411
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
412
    self.assertRaises(errors.LockError, self.sl.delete)
413

    
414
  def testDeleteTimeout(self):
415
    self.sl.delete(timeout=60)
416

    
417
  def testNoDeleteIfSharer(self):
418
    self.sl.acquire(shared=1)
419
    self.assertRaises(AssertionError, self.sl.delete)
420

    
421
  @_Repeat
422
  def testDeletePendingSharersExclusiveDelete(self):
423
    self.sl.acquire()
424
    self._addThread(target=self._doItSharer)
425
    self._addThread(target=self._doItSharer)
426
    self._addThread(target=self._doItExclusive)
427
    self._addThread(target=self._doItDelete)
428
    self.sl.delete()
429
    self._waitThreads()
430
    # The threads who were pending return ERR
431
    for _ in range(4):
432
      self.assertEqual(self.done.get_nowait(), 'ERR')
433
    self.sl = locking.SharedLock()
434

    
435
  @_Repeat
436
  def testDeletePendingDeleteExclusiveSharers(self):
437
    self.sl.acquire()
438
    self._addThread(target=self._doItDelete)
439
    self._addThread(target=self._doItExclusive)
440
    self._addThread(target=self._doItSharer)
441
    self._addThread(target=self._doItSharer)
442
    self.sl.delete()
443
    self._waitThreads()
444
    # The two threads who were pending return both ERR
445
    self.assertEqual(self.done.get_nowait(), 'ERR')
446
    self.assertEqual(self.done.get_nowait(), 'ERR')
447
    self.assertEqual(self.done.get_nowait(), 'ERR')
448
    self.assertEqual(self.done.get_nowait(), 'ERR')
449
    self.sl = locking.SharedLock()
450

    
451
  @_Repeat
452
  def testExclusiveAcquireTimeout(self):
453
    def _LockExclusive(wait):
454
      self.sl.acquire(shared=0)
455
      self.done.put("A: start sleep")
456
      time.sleep(wait)
457
      self.done.put("A: end sleep")
458
      self.sl.release()
459

    
460
    for shared in [0, 1]:
461
      # Start thread to hold lock for 20 ms
462
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
463

    
464
      # Wait for sleep to begin
465
      self.assertEqual(self.done.get(), "A: start sleep")
466

    
467
      # Wait up to 100 ms to get lock
468
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
469
      self.done.put("got 2nd")
470
      self.sl.release()
471

    
472
      self._waitThreads()
473

    
474
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
475
      self.assertEqual(self.done.get_nowait(), "got 2nd")
476
      self.assertRaises(Queue.Empty, self.done.get_nowait)
477

    
478
  @_Repeat
479
  def testAcquireExpiringTimeout(self):
480
    def _AcquireWithTimeout(shared, timeout):
481
      if not self.sl.acquire(shared=shared, timeout=timeout):
482
        self.done.put("timeout")
483

    
484
    for shared in [0, 1]:
485
      # Lock exclusively
486
      self.sl.acquire()
487

    
488
      # Start shared acquires with timeout between 0 and 20 ms
489
      for i in xrange(11):
490
        self._addThread(target=_AcquireWithTimeout,
491
                        args=(shared, i * 2.0 / 1000.0))
492

    
493
      # Wait for threads to finish (makes sure the acquire timeout expires
494
      # before releasing the lock)
495
      self._waitThreads()
496

    
497
      # Release lock
498
      self.sl.release()
499

    
500
      for _ in xrange(11):
501
        self.assertEqual(self.done.get_nowait(), "timeout")
502

    
503
      self.assertRaises(Queue.Empty, self.done.get_nowait)
504

    
505
  @_Repeat
506
  def testSharedSkipExclusiveAcquires(self):
507
    # Tests whether shared acquires jump in front of exclusive acquires in the
508
    # queue.
509

    
510
    # Get exclusive lock while we fill the queue
511
    self.sl.acquire()
512

    
513
    def _Acquire(shared, name):
514
      if not self.sl.acquire(shared=shared):
515
        return
516

    
517
      self.done.put(name)
518
      self.sl.release()
519

    
520
    # Start shared acquires
521
    for _ in xrange(5):
522
      self._addThread(target=_Acquire, args=(1, "shared A"))
523

    
524
    # Start exclusive acquires
525
    for _ in xrange(3):
526
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
527

    
528
    # More shared acquires
529
    for _ in xrange(5):
530
      self._addThread(target=_Acquire, args=(1, "shared C"))
531

    
532
    # More exclusive acquires
533
    for _ in xrange(3):
534
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
535

    
536
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
537
    # together. There's no way to wait for SharedLock.acquire to start
538
    # its work. Hence the timeout of 2 seconds.
539
    pending = 0
540
    end_time = time.time() + 2.0
541
    while time.time() < end_time:
542
      pending = self.sl._count_pending()
543
      self.assert_(pending >= 0 and pending <= 7)
544
      if pending == 7:
545
        break
546
      time.sleep(0.05)
547
    self.assertEqual(pending, 7)
548

    
549
    # Release exclusive lock and wait
550
    self.sl.release()
551

    
552
    self._waitThreads()
553

    
554
    # Check sequence
555
    shr_a = 0
556
    shr_c = 0
557
    for _ in xrange(10):
558
      # Shared locks aren't guaranteed to be notified in order, but they'll be
559
      # first
560
      tmp = self.done.get_nowait()
561
      if tmp == "shared A":
562
        shr_a += 1
563
      elif tmp == "shared C":
564
        shr_c += 1
565
    self.assertEqual(shr_a, 5)
566
    self.assertEqual(shr_c, 5)
567

    
568
    for _ in xrange(3):
569
      self.assertEqual(self.done.get_nowait(), "exclusive B")
570

    
571
    for _ in xrange(3):
572
      self.assertEqual(self.done.get_nowait(), "exclusive D")
573

    
574
    self.assertRaises(Queue.Empty, self.done.get_nowait)
575

    
576
  @_Repeat
577
  def testMixedAcquireTimeout(self):
578
    sync = threading.Condition()
579

    
580
    def _AcquireShared(ev):
581
      if not self.sl.acquire(shared=1, timeout=None):
582
        return
583

    
584
      self.done.put("shared")
585

    
586
      # Notify main thread
587
      ev.set()
588

    
589
      # Wait for notification
590
      sync.acquire()
591
      try:
592
        sync.wait()
593
      finally:
594
        sync.release()
595

    
596
      # Release lock
597
      self.sl.release()
598

    
599
    acquires = []
600
    for _ in xrange(3):
601
      ev = threading.Event()
602
      self._addThread(target=_AcquireShared, args=(ev, ))
603
      acquires.append(ev)
604

    
605
    # Wait for all acquires to finish
606
    for i in acquires:
607
      i.wait()
608

    
609
    self.assertEqual(self.sl._count_pending(), 0)
610

    
611
    # Try to get exclusive lock
612
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
613

    
614
    # Acquire exclusive without timeout
615
    exclsync = threading.Condition()
616
    exclev = threading.Event()
617

    
618
    def _AcquireExclusive():
619
      if not self.sl.acquire(shared=0):
620
        return
621

    
622
      self.done.put("exclusive")
623

    
624
      # Notify main thread
625
      exclev.set()
626

    
627
      exclsync.acquire()
628
      try:
629
        exclsync.wait()
630
      finally:
631
        exclsync.release()
632

    
633
      self.sl.release()
634

    
635
    self._addThread(target=_AcquireExclusive)
636

    
637
    # Try to get exclusive lock
638
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
639

    
640
    # Make all shared holders release their locks
641
    sync.acquire()
642
    try:
643
      sync.notifyAll()
644
    finally:
645
      sync.release()
646

    
647
    # Wait for exclusive acquire to succeed
648
    exclev.wait()
649

    
650
    self.assertEqual(self.sl._count_pending(), 0)
651

    
652
    # Try to get exclusive lock
653
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
654

    
655
    def _AcquireSharedSimple():
656
      if self.sl.acquire(shared=1, timeout=None):
657
        self.done.put("shared2")
658
        self.sl.release()
659

    
660
    for _ in xrange(10):
661
      self._addThread(target=_AcquireSharedSimple)
662

    
663
    # Tell exclusive lock to release
664
    exclsync.acquire()
665
    try:
666
      exclsync.notifyAll()
667
    finally:
668
      exclsync.release()
669

    
670
    # Wait for everything to finish
671
    self._waitThreads()
672

    
673
    self.assertEqual(self.sl._count_pending(), 0)
674

    
675
    # Check sequence
676
    for _ in xrange(3):
677
      self.assertEqual(self.done.get_nowait(), "shared")
678

    
679
    self.assertEqual(self.done.get_nowait(), "exclusive")
680

    
681
    for _ in xrange(10):
682
      self.assertEqual(self.done.get_nowait(), "shared2")
683

    
684
    self.assertRaises(Queue.Empty, self.done.get_nowait)
685

    
686

    
687
class TestSSynchronizedDecorator(_ThreadedTestCase):
688
  """Shared Lock Synchronized decorator test"""
689

    
690
  def setUp(self):
691
    _ThreadedTestCase.setUp(self)
692

    
693
  @locking.ssynchronized(_decoratorlock)
694
  def _doItExclusive(self):
695
    self.assert_(_decoratorlock._is_owned())
696
    self.done.put('EXC')
697

    
698
  @locking.ssynchronized(_decoratorlock, shared=1)
699
  def _doItSharer(self):
700
    self.assert_(_decoratorlock._is_owned(shared=1))
701
    self.done.put('SHR')
702

    
703
  def testDecoratedFunctions(self):
704
    self._doItExclusive()
705
    self.assert_(not _decoratorlock._is_owned())
706
    self._doItSharer()
707
    self.assert_(not _decoratorlock._is_owned())
708

    
709
  def testSharersCanCoexist(self):
710
    _decoratorlock.acquire(shared=1)
711
    threading.Thread(target=self._doItSharer).start()
712
    self.assert_(self.done.get(True, 1))
713
    _decoratorlock.release()
714

    
715
  @_Repeat
716
  def testExclusiveBlocksExclusive(self):
717
    _decoratorlock.acquire()
718
    self._addThread(target=self._doItExclusive)
719
    # give it a bit of time to check that it's not actually doing anything
720
    self.assertRaises(Queue.Empty, self.done.get_nowait)
721
    _decoratorlock.release()
722
    self._waitThreads()
723
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
724

    
725
  @_Repeat
726
  def testExclusiveBlocksSharer(self):
727
    _decoratorlock.acquire()
728
    self._addThread(target=self._doItSharer)
729
    self.assertRaises(Queue.Empty, self.done.get_nowait)
730
    _decoratorlock.release()
731
    self._waitThreads()
732
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
733

    
734
  @_Repeat
735
  def testSharerBlocksExclusive(self):
736
    _decoratorlock.acquire(shared=1)
737
    self._addThread(target=self._doItExclusive)
738
    self.assertRaises(Queue.Empty, self.done.get_nowait)
739
    _decoratorlock.release()
740
    self._waitThreads()
741
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
742

    
743

    
744
class TestLockSet(_ThreadedTestCase):
745
  """LockSet tests"""
746

    
747
  def setUp(self):
748
    _ThreadedTestCase.setUp(self)
749
    self._setUpLS()
750

    
751
  def _setUpLS(self):
752
    """Helper to (re)initialize the lock set"""
753
    self.resources = ['one', 'two', 'three']
754
    self.ls = locking.LockSet(members=self.resources)
755

    
756
  def testResources(self):
757
    self.assertEquals(self.ls._names(), set(self.resources))
758
    newls = locking.LockSet()
759
    self.assertEquals(newls._names(), set())
760

    
761
  def testAcquireRelease(self):
762
    self.assert_(self.ls.acquire('one'))
763
    self.assertEquals(self.ls._list_owned(), set(['one']))
764
    self.ls.release()
765
    self.assertEquals(self.ls._list_owned(), set())
766
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
767
    self.assertEquals(self.ls._list_owned(), set(['one']))
768
    self.ls.release()
769
    self.assertEquals(self.ls._list_owned(), set())
770
    self.ls.acquire(['one', 'two', 'three'])
771
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
772
    self.ls.release('one')
773
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
774
    self.ls.release(['three'])
775
    self.assertEquals(self.ls._list_owned(), set(['two']))
776
    self.ls.release()
777
    self.assertEquals(self.ls._list_owned(), set())
778
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
779
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
780
    self.ls.release()
781
    self.assertEquals(self.ls._list_owned(), set())
782

    
783
  def testNoDoubleAcquire(self):
784
    self.ls.acquire('one')
785
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
786
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
787
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
788
    self.ls.release()
789
    self.ls.acquire(['one', 'three'])
790
    self.ls.release('one')
791
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
792
    self.ls.release('three')
793

    
794
  def testNoWrongRelease(self):
795
    self.assertRaises(AssertionError, self.ls.release)
796
    self.ls.acquire('one')
797
    self.assertRaises(AssertionError, self.ls.release, 'two')
798

    
799
  def testAddRemove(self):
800
    self.ls.add('four')
801
    self.assertEquals(self.ls._list_owned(), set())
802
    self.assert_('four' in self.ls._names())
803
    self.ls.add(['five', 'six', 'seven'], acquired=1)
804
    self.assert_('five' in self.ls._names())
805
    self.assert_('six' in self.ls._names())
806
    self.assert_('seven' in self.ls._names())
807
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
808
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
809
    self.assert_('five' not in self.ls._names())
810
    self.assert_('six' not in self.ls._names())
811
    self.assertEquals(self.ls._list_owned(), set(['seven']))
812
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
813
    self.ls.remove('seven')
814
    self.assert_('seven' not in self.ls._names())
815
    self.assertEquals(self.ls._list_owned(), set([]))
816
    self.ls.acquire(None, shared=1)
817
    self.assertRaises(AssertionError, self.ls.add, 'eight')
818
    self.ls.release()
819
    self.ls.acquire(None)
820
    self.ls.add('eight', acquired=1)
821
    self.assert_('eight' in self.ls._names())
822
    self.assert_('eight' in self.ls._list_owned())
823
    self.ls.add('nine')
824
    self.assert_('nine' in self.ls._names())
825
    self.assert_('nine' not in self.ls._list_owned())
826
    self.ls.release()
827
    self.ls.remove(['two'])
828
    self.assert_('two' not in self.ls._names())
829
    self.ls.acquire('three')
830
    self.assertEquals(self.ls.remove(['three']), ['three'])
831
    self.assert_('three' not in self.ls._names())
832
    self.assertEquals(self.ls.remove('three'), [])
833
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
834
    self.assert_('one' not in self.ls._names())
835

    
836
  def testRemoveNonBlocking(self):
837
    self.ls.acquire('one')
838
    self.assertEquals(self.ls.remove('one'), ['one'])
839
    self.ls.acquire(['two', 'three'])
840
    self.assertEquals(self.ls.remove(['two', 'three']),
841
                      ['two', 'three'])
842

    
843
  def testNoDoubleAdd(self):
844
    self.assertRaises(errors.LockError, self.ls.add, 'two')
845
    self.ls.add('four')
846
    self.assertRaises(errors.LockError, self.ls.add, 'four')
847

    
848
  def testNoWrongRemoves(self):
849
    self.ls.acquire(['one', 'three'], shared=1)
850
    # Cannot remove 'two' while holding something which is not a superset
851
    self.assertRaises(AssertionError, self.ls.remove, 'two')
852
    # Cannot remove 'three' as we are sharing it
853
    self.assertRaises(AssertionError, self.ls.remove, 'three')
854

    
855
  def testAcquireSetLock(self):
856
    # acquire the set-lock exclusively
857
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
858
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
859
    self.assertEquals(self.ls._is_owned(), True)
860
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
861
    # I can still add/remove elements...
862
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
863
    self.assert_(self.ls.add('six'))
864
    self.ls.release()
865
    # share the set-lock
866
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
867
    # adding new elements is not possible
868
    self.assertRaises(AssertionError, self.ls.add, 'five')
869
    self.ls.release()
870

    
871
  def testAcquireWithRepetitions(self):
872
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
873
                      set(['two', 'two', 'three']))
874
    self.ls.release(['two', 'two'])
875
    self.assertEquals(self.ls._list_owned(), set(['three']))
876

    
877
  def testEmptyAcquire(self):
878
    # Acquire an empty list of locks...
879
    self.assertEquals(self.ls.acquire([]), set())
880
    self.assertEquals(self.ls._list_owned(), set())
881
    # New locks can still be addded
882
    self.assert_(self.ls.add('six'))
883
    # "re-acquiring" is not an issue, since we had really acquired nothing
884
    self.assertEquals(self.ls.acquire([], shared=1), set())
885
    self.assertEquals(self.ls._list_owned(), set())
886
    # We haven't really acquired anything, so we cannot release
887
    self.assertRaises(AssertionError, self.ls.release)
888

    
889
  def _doLockSet(self, names, shared):
890
    try:
891
      self.ls.acquire(names, shared=shared)
892
      self.done.put('DONE')
893
      self.ls.release()
894
    except errors.LockError:
895
      self.done.put('ERR')
896

    
897
  def _doAddSet(self, names):
898
    try:
899
      self.ls.add(names, acquired=1)
900
      self.done.put('DONE')
901
      self.ls.release()
902
    except errors.LockError:
903
      self.done.put('ERR')
904

    
905
  def _doRemoveSet(self, names):
906
    self.done.put(self.ls.remove(names))
907

    
908
  @_Repeat
909
  def testConcurrentSharedAcquire(self):
910
    self.ls.acquire(['one', 'two'], shared=1)
911
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
912
    self._waitThreads()
913
    self.assertEqual(self.done.get_nowait(), 'DONE')
914
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
915
    self._waitThreads()
916
    self.assertEqual(self.done.get_nowait(), 'DONE')
917
    self._addThread(target=self._doLockSet, args=('three', 1))
918
    self._waitThreads()
919
    self.assertEqual(self.done.get_nowait(), 'DONE')
920
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
921
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
922
    self.assertRaises(Queue.Empty, self.done.get_nowait)
923
    self.ls.release()
924
    self._waitThreads()
925
    self.assertEqual(self.done.get_nowait(), 'DONE')
926
    self.assertEqual(self.done.get_nowait(), 'DONE')
927

    
928
  @_Repeat
929
  def testConcurrentExclusiveAcquire(self):
930
    self.ls.acquire(['one', 'two'])
931
    self._addThread(target=self._doLockSet, args=('three', 1))
932
    self._waitThreads()
933
    self.assertEqual(self.done.get_nowait(), 'DONE')
934
    self._addThread(target=self._doLockSet, args=('three', 0))
935
    self._waitThreads()
936
    self.assertEqual(self.done.get_nowait(), 'DONE')
937
    self.assertRaises(Queue.Empty, self.done.get_nowait)
938
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
939
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
940
    self._addThread(target=self._doLockSet, args=('one', 0))
941
    self._addThread(target=self._doLockSet, args=('one', 1))
942
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
943
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
944
    self.assertRaises(Queue.Empty, self.done.get_nowait)
945
    self.ls.release()
946
    self._waitThreads()
947
    for _ in range(6):
948
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
949

    
950
  @_Repeat
951
  def testConcurrentRemove(self):
952
    self.ls.add('four')
953
    self.ls.acquire(['one', 'two', 'four'])
954
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
955
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
956
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
957
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
958
    self.assertRaises(Queue.Empty, self.done.get_nowait)
959
    self.ls.remove('one')
960
    self.ls.release()
961
    self._waitThreads()
962
    for i in range(4):
963
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
964
    self.ls.add(['five', 'six'], acquired=1)
965
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
966
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
967
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
968
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
969
    self.ls.remove('five')
970
    self.ls.release()
971
    self._waitThreads()
972
    for i in range(4):
973
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
974
    self.ls.acquire(['three', 'four'])
975
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
976
    self.assertRaises(Queue.Empty, self.done.get_nowait)
977
    self.ls.remove('four')
978
    self._waitThreads()
979
    self.assertEqual(self.done.get_nowait(), ['six'])
980
    self._addThread(target=self._doRemoveSet, args=(['two']))
981
    self._waitThreads()
982
    self.assertEqual(self.done.get_nowait(), ['two'])
983
    self.ls.release()
984
    # reset lockset
985
    self._setUpLS()
986

    
987
  @_Repeat
988
  def testConcurrentSharedSetLock(self):
989
    # share the set-lock...
990
    self.ls.acquire(None, shared=1)
991
    # ...another thread can share it too
992
    self._addThread(target=self._doLockSet, args=(None, 1))
993
    self._waitThreads()
994
    self.assertEqual(self.done.get_nowait(), 'DONE')
995
    # ...or just share some elements
996
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
997
    self._waitThreads()
998
    self.assertEqual(self.done.get_nowait(), 'DONE')
999
    # ...but not add new ones or remove any
1000
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1001
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1002
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1003
    # this just releases the set-lock
1004
    self.ls.release([])
1005
    t.join(60)
1006
    self.assertEqual(self.done.get_nowait(), 'DONE')
1007
    # release the lock on the actual elements so remove() can proceed too
1008
    self.ls.release()
1009
    self._waitThreads()
1010
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1011
    # reset lockset
1012
    self._setUpLS()
1013

    
1014
  @_Repeat
1015
  def testConcurrentExclusiveSetLock(self):
1016
    # acquire the set-lock...
1017
    self.ls.acquire(None, shared=0)
1018
    # ...no one can do anything else
1019
    self._addThread(target=self._doLockSet, args=(None, 1))
1020
    self._addThread(target=self._doLockSet, args=(None, 0))
1021
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1022
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1023
    self._addThread(target=self._doAddSet, args=(['nine']))
1024
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1025
    self.ls.release()
1026
    self._waitThreads()
1027
    for _ in range(5):
1028
      self.assertEqual(self.done.get(True, 1), 'DONE')
1029
    # cleanup
1030
    self._setUpLS()
1031

    
1032
  @_Repeat
1033
  def testConcurrentSetLockAdd(self):
1034
    self.ls.acquire('one')
1035
    # Another thread wants the whole SetLock
1036
    self._addThread(target=self._doLockSet, args=(None, 0))
1037
    self._addThread(target=self._doLockSet, args=(None, 1))
1038
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1039
    self.assertRaises(AssertionError, self.ls.add, 'four')
1040
    self.ls.release()
1041
    self._waitThreads()
1042
    self.assertEqual(self.done.get_nowait(), 'DONE')
1043
    self.assertEqual(self.done.get_nowait(), 'DONE')
1044
    self.ls.acquire(None)
1045
    self._addThread(target=self._doLockSet, args=(None, 0))
1046
    self._addThread(target=self._doLockSet, args=(None, 1))
1047
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1048
    self.ls.add('four')
1049
    self.ls.add('five', acquired=1)
1050
    self.ls.add('six', acquired=1, shared=1)
1051
    self.assertEquals(self.ls._list_owned(),
1052
      set(['one', 'two', 'three', 'five', 'six']))
1053
    self.assertEquals(self.ls._is_owned(), True)
1054
    self.assertEquals(self.ls._names(),
1055
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1056
    self.ls.release()
1057
    self._waitThreads()
1058
    self.assertEqual(self.done.get_nowait(), 'DONE')
1059
    self.assertEqual(self.done.get_nowait(), 'DONE')
1060
    self._setUpLS()
1061

    
1062
  @_Repeat
1063
  def testEmptyLockSet(self):
1064
    # get the set-lock
1065
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1066
    # now empty it...
1067
    self.ls.remove(['one', 'two', 'three'])
1068
    # and adds/locks by another thread still wait
1069
    self._addThread(target=self._doAddSet, args=(['nine']))
1070
    self._addThread(target=self._doLockSet, args=(None, 1))
1071
    self._addThread(target=self._doLockSet, args=(None, 0))
1072
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1073
    self.ls.release()
1074
    self._waitThreads()
1075
    for _ in range(3):
1076
      self.assertEqual(self.done.get_nowait(), 'DONE')
1077
    # empty it again...
1078
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1079
    # now share it...
1080
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1081
    # other sharers can go, adds still wait
1082
    self._addThread(target=self._doLockSet, args=(None, 1))
1083
    self._waitThreads()
1084
    self.assertEqual(self.done.get_nowait(), 'DONE')
1085
    self._addThread(target=self._doAddSet, args=(['nine']))
1086
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1087
    self.ls.release()
1088
    self._waitThreads()
1089
    self.assertEqual(self.done.get_nowait(), 'DONE')
1090
    self._setUpLS()
1091

    
1092

    
1093
class TestGanetiLockManager(_ThreadedTestCase):
1094

    
1095
  def setUp(self):
1096
    _ThreadedTestCase.setUp(self)
1097
    self.nodes=['n1', 'n2']
1098
    self.instances=['i1', 'i2', 'i3']
1099
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1100
                                        instances=self.instances)
1101

    
1102
  def tearDown(self):
1103
    # Don't try this at home...
1104
    locking.GanetiLockManager._instance = None
1105

    
1106
  def testLockingConstants(self):
1107
    # The locking library internally cheats by assuming its constants have some
1108
    # relationships with each other. Check those hold true.
1109
    # This relationship is also used in the Processor to recursively acquire
1110
    # the right locks. Again, please don't break it.
1111
    for i in range(len(locking.LEVELS)):
1112
      self.assertEqual(i, locking.LEVELS[i])
1113

    
1114
  def testDoubleGLFails(self):
1115
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1116

    
1117
  def testLockNames(self):
1118
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1119
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1120
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1121
                     set(self.instances))
1122

    
1123
  def testInitAndResources(self):
1124
    locking.GanetiLockManager._instance = None
1125
    self.GL = locking.GanetiLockManager()
1126
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1127
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1128
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1129

    
1130
    locking.GanetiLockManager._instance = None
1131
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1132
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1133
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1134
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1135

    
1136
    locking.GanetiLockManager._instance = None
1137
    self.GL = locking.GanetiLockManager(instances=self.instances)
1138
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1139
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1140
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1141
                     set(self.instances))
1142

    
1143
  def testAcquireRelease(self):
1144
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1145
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1146
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1147
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1148
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1149
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1150
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1151
    self.GL.release(locking.LEVEL_NODE)
1152
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1153
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1154
    self.GL.release(locking.LEVEL_INSTANCE)
1155
    self.assertRaises(errors.LockError, self.GL.acquire,
1156
                      locking.LEVEL_INSTANCE, ['i5'])
1157
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1158
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1159

    
1160
  def testAcquireWholeSets(self):
1161
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1162
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1163
                      set(self.instances))
1164
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1165
                      set(self.instances))
1166
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1167
                      set(self.nodes))
1168
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1169
                      set(self.nodes))
1170
    self.GL.release(locking.LEVEL_NODE)
1171
    self.GL.release(locking.LEVEL_INSTANCE)
1172
    self.GL.release(locking.LEVEL_CLUSTER)
1173

    
1174
  def testAcquireWholeAndPartial(self):
1175
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1176
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1177
                      set(self.instances))
1178
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1179
                      set(self.instances))
1180
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1181
                      set(['n2']))
1182
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1183
                      set(['n2']))
1184
    self.GL.release(locking.LEVEL_NODE)
1185
    self.GL.release(locking.LEVEL_INSTANCE)
1186
    self.GL.release(locking.LEVEL_CLUSTER)
1187

    
1188
  def testBGLDependency(self):
1189
    self.assertRaises(AssertionError, self.GL.acquire,
1190
                      locking.LEVEL_NODE, ['n1', 'n2'])
1191
    self.assertRaises(AssertionError, self.GL.acquire,
1192
                      locking.LEVEL_INSTANCE, ['i3'])
1193
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1194
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1195
    self.assertRaises(AssertionError, self.GL.release,
1196
                      locking.LEVEL_CLUSTER, ['BGL'])
1197
    self.assertRaises(AssertionError, self.GL.release,
1198
                      locking.LEVEL_CLUSTER)
1199
    self.GL.release(locking.LEVEL_NODE)
1200
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1201
    self.assertRaises(AssertionError, self.GL.release,
1202
                      locking.LEVEL_CLUSTER, ['BGL'])
1203
    self.assertRaises(AssertionError, self.GL.release,
1204
                      locking.LEVEL_CLUSTER)
1205
    self.GL.release(locking.LEVEL_INSTANCE)
1206

    
1207
  def testWrongOrder(self):
1208
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1209
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1210
    self.assertRaises(AssertionError, self.GL.acquire,
1211
                      locking.LEVEL_NODE, ['n1'])
1212
    self.assertRaises(AssertionError, self.GL.acquire,
1213
                      locking.LEVEL_INSTANCE, ['i2'])
1214

    
1215
  # Helper function to run as a thread that shared the BGL and then acquires
1216
  # some locks at another level.
1217
  def _doLock(self, level, names, shared):
1218
    try:
1219
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1220
      self.GL.acquire(level, names, shared=shared)
1221
      self.done.put('DONE')
1222
      self.GL.release(level)
1223
      self.GL.release(locking.LEVEL_CLUSTER)
1224
    except errors.LockError:
1225
      self.done.put('ERR')
1226

    
1227
  @_Repeat
1228
  def testConcurrency(self):
1229
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1230
    self._addThread(target=self._doLock,
1231
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1232
    self._waitThreads()
1233
    self.assertEqual(self.done.get_nowait(), 'DONE')
1234
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1235
    self._addThread(target=self._doLock,
1236
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1237
    self._waitThreads()
1238
    self.assertEqual(self.done.get_nowait(), 'DONE')
1239
    self._addThread(target=self._doLock,
1240
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1241
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1242
    self.GL.release(locking.LEVEL_INSTANCE)
1243
    self._waitThreads()
1244
    self.assertEqual(self.done.get_nowait(), 'DONE')
1245
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1246
    self._addThread(target=self._doLock,
1247
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1248
    self._waitThreads()
1249
    self.assertEqual(self.done.get_nowait(), 'DONE')
1250
    self._addThread(target=self._doLock,
1251
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1252
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1253
    self.GL.release(locking.LEVEL_INSTANCE)
1254
    self._waitThreads()
1255
    self.assertEqual(self.done.get(True, 1), 'DONE')
1256
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1257

    
1258

    
1259
if __name__ == '__main__':
1260
  unittest.main()
1261
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1262
  #unittest.TextTestRunner(verbosity=2).run(suite)