Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ d76167a5

History | View | Annotate | Download (39.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
class _ThreadedTestCase(unittest.TestCase):
52
  """Test class that supports adding/waiting on threads"""
53
  def setUp(self):
54
    unittest.TestCase.setUp(self)
55
    self.threads = []
56

    
57
  def _addThread(self, *args, **kwargs):
58
    """Create and remember a new thread"""
59
    t = threading.Thread(*args, **kwargs)
60
    self.threads.append(t)
61
    t.start()
62
    return t
63

    
64
  def _waitThreads(self):
65
    """Wait for all our threads to finish"""
66
    for t in self.threads:
67
      t.join(60)
68
      self.failIf(t.isAlive())
69
    self.threads = []
70

    
71

    
72
class TestSingleActionPipeCondition(unittest.TestCase):
73
  """_SingleActionPipeCondition tests"""
74

    
75
  def setUp(self):
76
    self.cond = locking._SingleActionPipeCondition()
77

    
78
  def testInitialization(self):
79
    self.assert_(self.cond._read_fd is not None)
80
    self.assert_(self.cond._write_fd is not None)
81
    self.assert_(self.cond._poller is not None)
82
    self.assertEqual(self.cond._nwaiters, 0)
83

    
84
  def testUsageCount(self):
85
    self.cond.StartWaiting()
86
    self.assert_(self.cond._read_fd is not None)
87
    self.assert_(self.cond._write_fd is not None)
88
    self.assert_(self.cond._poller is not None)
89
    self.assertEqual(self.cond._nwaiters, 1)
90

    
91
    # use again
92
    self.cond.StartWaiting()
93
    self.assertEqual(self.cond._nwaiters, 2)
94

    
95
    # there is more than one user
96
    self.assert_(not self.cond.DoneWaiting())
97
    self.assert_(self.cond._read_fd is not None)
98
    self.assert_(self.cond._write_fd is not None)
99
    self.assert_(self.cond._poller is not None)
100
    self.assertEqual(self.cond._nwaiters, 1)
101

    
102
    self.assert_(self.cond.DoneWaiting())
103
    self.assertEqual(self.cond._nwaiters, 0)
104
    self.assert_(self.cond._read_fd is None)
105
    self.assert_(self.cond._write_fd is None)
106
    self.assert_(self.cond._poller is None)
107

    
108
  def testNotify(self):
109
    wait1 = self.cond.StartWaiting()
110
    wait2 = self.cond.StartWaiting()
111

    
112
    self.assert_(self.cond._read_fd is not None)
113
    self.assert_(self.cond._write_fd is not None)
114
    self.assert_(self.cond._poller is not None)
115

    
116
    self.cond.notifyAll()
117

    
118
    self.assert_(self.cond._read_fd is not None)
119
    self.assert_(self.cond._write_fd is None)
120
    self.assert_(self.cond._poller is not None)
121

    
122
    self.assert_(not self.cond.DoneWaiting())
123

    
124
    self.assert_(self.cond._read_fd is not None)
125
    self.assert_(self.cond._write_fd is None)
126
    self.assert_(self.cond._poller is not None)
127

    
128
    self.assert_(self.cond.DoneWaiting())
129

    
130
    self.assert_(self.cond._read_fd is None)
131
    self.assert_(self.cond._write_fd is None)
132
    self.assert_(self.cond._poller is None)
133

    
134
  def testReusage(self):
135
    self.cond.StartWaiting()
136
    self.assert_(self.cond._read_fd is not None)
137
    self.assert_(self.cond._write_fd is not None)
138
    self.assert_(self.cond._poller is not None)
139

    
140
    self.assert_(self.cond.DoneWaiting())
141

    
142
    self.assertRaises(RuntimeError, self.cond.StartWaiting)
143
    self.assert_(self.cond._read_fd is None)
144
    self.assert_(self.cond._write_fd is None)
145
    self.assert_(self.cond._poller is None)
146

    
147
  def testNotifyTwice(self):
148
    self.cond.notifyAll()
149
    self.assertRaises(RuntimeError, self.cond.notifyAll)
150

    
151

    
152
class TestSharedLock(_ThreadedTestCase):
153
  """SharedLock tests"""
154

    
155
  def setUp(self):
156
    _ThreadedTestCase.setUp(self)
157
    self.sl = locking.SharedLock()
158
    # helper threads use the 'done' queue to tell the master they finished.
159
    self.done = Queue.Queue(0)
160

    
161
  def testSequenceAndOwnership(self):
162
    self.assert_(not self.sl._is_owned())
163
    self.sl.acquire(shared=1)
164
    self.assert_(self.sl._is_owned())
165
    self.assert_(self.sl._is_owned(shared=1))
166
    self.assert_(not self.sl._is_owned(shared=0))
167
    self.sl.release()
168
    self.assert_(not self.sl._is_owned())
169
    self.sl.acquire()
170
    self.assert_(self.sl._is_owned())
171
    self.assert_(not self.sl._is_owned(shared=1))
172
    self.assert_(self.sl._is_owned(shared=0))
173
    self.sl.release()
174
    self.assert_(not self.sl._is_owned())
175
    self.sl.acquire(shared=1)
176
    self.assert_(self.sl._is_owned())
177
    self.assert_(self.sl._is_owned(shared=1))
178
    self.assert_(not self.sl._is_owned(shared=0))
179
    self.sl.release()
180
    self.assert_(not self.sl._is_owned())
181

    
182
  def testBooleanValue(self):
183
    # semaphores are supposed to return a true value on a successful acquire
184
    self.assert_(self.sl.acquire(shared=1))
185
    self.sl.release()
186
    self.assert_(self.sl.acquire())
187
    self.sl.release()
188

    
189
  def testDoubleLockingStoE(self):
190
    self.sl.acquire(shared=1)
191
    self.assertRaises(AssertionError, self.sl.acquire)
192

    
193
  def testDoubleLockingEtoS(self):
194
    self.sl.acquire()
195
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
196

    
197
  def testDoubleLockingStoS(self):
198
    self.sl.acquire(shared=1)
199
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
200

    
201
  def testDoubleLockingEtoE(self):
202
    self.sl.acquire()
203
    self.assertRaises(AssertionError, self.sl.acquire)
204

    
205
  # helper functions: called in a separate thread they acquire the lock, send
206
  # their identifier on the done queue, then release it.
207
  def _doItSharer(self):
208
    try:
209
      self.sl.acquire(shared=1)
210
      self.done.put('SHR')
211
      self.sl.release()
212
    except errors.LockError:
213
      self.done.put('ERR')
214

    
215
  def _doItExclusive(self):
216
    try:
217
      self.sl.acquire()
218
      self.done.put('EXC')
219
      self.sl.release()
220
    except errors.LockError:
221
      self.done.put('ERR')
222

    
223
  def _doItDelete(self):
224
    try:
225
      self.sl.delete()
226
      self.done.put('DEL')
227
    except errors.LockError:
228
      self.done.put('ERR')
229

    
230
  def testSharersCanCoexist(self):
231
    self.sl.acquire(shared=1)
232
    threading.Thread(target=self._doItSharer).start()
233
    self.assert_(self.done.get(True, 1))
234
    self.sl.release()
235

    
236
  @_Repeat
237
  def testExclusiveBlocksExclusive(self):
238
    self.sl.acquire()
239
    self._addThread(target=self._doItExclusive)
240
    self.assertRaises(Queue.Empty, self.done.get_nowait)
241
    self.sl.release()
242
    self._waitThreads()
243
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
244

    
245
  @_Repeat
246
  def testExclusiveBlocksDelete(self):
247
    self.sl.acquire()
248
    self._addThread(target=self._doItDelete)
249
    self.assertRaises(Queue.Empty, self.done.get_nowait)
250
    self.sl.release()
251
    self._waitThreads()
252
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
253
    self.sl = locking.SharedLock()
254

    
255
  @_Repeat
256
  def testExclusiveBlocksSharer(self):
257
    self.sl.acquire()
258
    self._addThread(target=self._doItSharer)
259
    self.assertRaises(Queue.Empty, self.done.get_nowait)
260
    self.sl.release()
261
    self._waitThreads()
262
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
263

    
264
  @_Repeat
265
  def testSharerBlocksExclusive(self):
266
    self.sl.acquire(shared=1)
267
    self._addThread(target=self._doItExclusive)
268
    self.assertRaises(Queue.Empty, self.done.get_nowait)
269
    self.sl.release()
270
    self._waitThreads()
271
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
272

    
273
  @_Repeat
274
  def testSharerBlocksDelete(self):
275
    self.sl.acquire(shared=1)
276
    self._addThread(target=self._doItDelete)
277
    self.assertRaises(Queue.Empty, self.done.get_nowait)
278
    self.sl.release()
279
    self._waitThreads()
280
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
281
    self.sl = locking.SharedLock()
282

    
283
  @_Repeat
284
  def testWaitingExclusiveBlocksSharer(self):
285
    """SKIPPED testWaitingExclusiveBlockSharer"""
286
    return
287

    
288
    self.sl.acquire(shared=1)
289
    # the lock is acquired in shared mode...
290
    self._addThread(target=self._doItExclusive)
291
    # ...but now an exclusive is waiting...
292
    self._addThread(target=self._doItSharer)
293
    # ...so the sharer should be blocked as well
294
    self.assertRaises(Queue.Empty, self.done.get_nowait)
295
    self.sl.release()
296
    self._waitThreads()
297
    # The exclusive passed before
298
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
299
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
300

    
301
  @_Repeat
302
  def testWaitingSharerBlocksExclusive(self):
303
    """SKIPPED testWaitingSharerBlocksExclusive"""
304
    return
305

    
306
    self.sl.acquire()
307
    # the lock is acquired in exclusive mode...
308
    self._addThread(target=self._doItSharer)
309
    # ...but now a sharer is waiting...
310
    self._addThread(target=self._doItExclusive)
311
    # ...the exclusive is waiting too...
312
    self.assertRaises(Queue.Empty, self.done.get_nowait)
313
    self.sl.release()
314
    self._waitThreads()
315
    # The sharer passed before
316
    self.assertEqual(self.done.get_nowait(), 'SHR')
317
    self.assertEqual(self.done.get_nowait(), 'EXC')
318

    
319
  def testDelete(self):
320
    self.sl.delete()
321
    self.assertRaises(errors.LockError, self.sl.acquire)
322
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
323
    self.assertRaises(errors.LockError, self.sl.delete)
324

    
325
  def testNoDeleteIfSharer(self):
326
    self.sl.acquire(shared=1)
327
    self.assertRaises(AssertionError, self.sl.delete)
328

    
329
  @_Repeat
330
  def testDeletePendingSharersExclusiveDelete(self):
331
    self.sl.acquire()
332
    self._addThread(target=self._doItSharer)
333
    self._addThread(target=self._doItSharer)
334
    self._addThread(target=self._doItExclusive)
335
    self._addThread(target=self._doItDelete)
336
    self.sl.delete()
337
    self._waitThreads()
338
    # The threads who were pending return ERR
339
    for _ in range(4):
340
      self.assertEqual(self.done.get_nowait(), 'ERR')
341
    self.sl = locking.SharedLock()
342

    
343
  @_Repeat
344
  def testDeletePendingDeleteExclusiveSharers(self):
345
    self.sl.acquire()
346
    self._addThread(target=self._doItDelete)
347
    self._addThread(target=self._doItExclusive)
348
    self._addThread(target=self._doItSharer)
349
    self._addThread(target=self._doItSharer)
350
    self.sl.delete()
351
    self._waitThreads()
352
    # The two threads who were pending return both ERR
353
    self.assertEqual(self.done.get_nowait(), 'ERR')
354
    self.assertEqual(self.done.get_nowait(), 'ERR')
355
    self.assertEqual(self.done.get_nowait(), 'ERR')
356
    self.assertEqual(self.done.get_nowait(), 'ERR')
357
    self.sl = locking.SharedLock()
358

    
359
  @_Repeat
360
  def testExclusiveAcquireTimeout(self):
361
    def _LockExclusive(wait):
362
      self.sl.acquire(shared=0)
363
      self.done.put("A: start sleep")
364
      time.sleep(wait)
365
      self.done.put("A: end sleep")
366
      self.sl.release()
367

    
368
    for shared in [0, 1]:
369
      # Start thread to hold lock for 20 ms
370
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
371

    
372
      # Wait up to 100 ms to get lock
373
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
374
      self.done.put("got 2nd")
375
      self.sl.release()
376

    
377
      self._waitThreads()
378

    
379
      self.assertEqual(self.done.get_nowait(), "A: start sleep")
380
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
381
      self.assertEqual(self.done.get_nowait(), "got 2nd")
382
      self.assertRaises(Queue.Empty, self.done.get_nowait)
383

    
384
  @_Repeat
385
  def testAcquireExpiringTimeout(self):
386
    def _AcquireWithTimeout(shared, timeout):
387
      if not self.sl.acquire(shared=shared, timeout=timeout):
388
        self.done.put("timeout")
389

    
390
    for shared in [0, 1]:
391
      # Lock exclusively
392
      self.sl.acquire()
393

    
394
      # Start shared acquires with timeout between 0 and 20 ms
395
      for i in xrange(11):
396
        self._addThread(target=_AcquireWithTimeout,
397
                        args=(shared, i * 2.0 / 1000.0))
398

    
399
      # Wait for threads to finish (makes sure the acquire timeout expires
400
      # before releasing the lock)
401
      self._waitThreads()
402

    
403
      # Release lock
404
      self.sl.release()
405

    
406
      for _ in xrange(11):
407
        self.assertEqual(self.done.get_nowait(), "timeout")
408

    
409
      self.assertRaises(Queue.Empty, self.done.get_nowait)
410

    
411
  @_Repeat
412
  def testSharedSkipExclusiveAcquires(self):
413
    # Tests whether shared acquires jump in front of exclusive acquires in the
414
    # queue.
415

    
416
    # Get exclusive lock while we fill the queue
417
    self.sl.acquire()
418

    
419
    def _Acquire(shared, name):
420
      if not self.sl.acquire(shared=shared):
421
        return
422

    
423
      self.done.put(name)
424
      self.sl.release()
425

    
426
    # Start shared acquires
427
    for _ in xrange(5):
428
      self._addThread(target=_Acquire, args=(1, "shared A"))
429

    
430
    # Start exclusive acquires
431
    for _ in xrange(3):
432
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
433

    
434
    # More shared acquires
435
    for _ in xrange(5):
436
      self._addThread(target=_Acquire, args=(1, "shared C"))
437

    
438
    # More exclusive acquires
439
    for _ in xrange(3):
440
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
441

    
442
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
443
    # together
444
    self.assertEqual(self.sl._count_pending(), 7)
445

    
446
    # Release exclusive lock and wait
447
    self.sl.release()
448

    
449
    self._waitThreads()
450

    
451
    # Check sequence
452
    for _ in xrange(10):
453
      # Shared locks aren't guaranteed to be notified in order, but they'll be
454
      # first
455
      self.assert_(self.done.get_nowait() in ("shared A", "shared C"))
456

    
457
    for _ in xrange(3):
458
      self.assertEqual(self.done.get_nowait(), "exclusive B")
459

    
460
    for _ in xrange(3):
461
      self.assertEqual(self.done.get_nowait(), "exclusive D")
462

    
463
    self.assertRaises(Queue.Empty, self.done.get_nowait)
464

    
465
  @_Repeat
466
  def testMixedAcquireTimeout(self):
467
    sync = threading.Condition()
468

    
469
    def _AcquireShared(ev):
470
      if not self.sl.acquire(shared=1, timeout=None):
471
        return
472

    
473
      self.done.put("shared")
474

    
475
      # Notify main thread
476
      ev.set()
477

    
478
      # Wait for notification
479
      sync.acquire()
480
      try:
481
        sync.wait()
482
      finally:
483
        sync.release()
484

    
485
      # Release lock
486
      self.sl.release()
487

    
488
    acquires = []
489
    for _ in xrange(3):
490
      ev = threading.Event()
491
      self._addThread(target=_AcquireShared, args=(ev, ))
492
      acquires.append(ev)
493

    
494
    # Wait for all acquires to finish
495
    for i in acquires:
496
      i.wait()
497

    
498
    self.assertEqual(self.sl._count_pending(), 0)
499

    
500
    # Try to get exclusive lock
501
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
502

    
503
    # Acquire exclusive without timeout
504
    exclsync = threading.Condition()
505
    exclev = threading.Event()
506

    
507
    def _AcquireExclusive():
508
      if not self.sl.acquire(shared=0):
509
        return
510

    
511
      self.done.put("exclusive")
512

    
513
      # Notify main thread
514
      exclev.set()
515

    
516
      exclsync.acquire()
517
      try:
518
        exclsync.wait()
519
      finally:
520
        exclsync.release()
521

    
522
      self.sl.release()
523

    
524
    self._addThread(target=_AcquireExclusive)
525

    
526
    # Try to get exclusive lock
527
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
528

    
529
    # Make all shared holders release their locks
530
    sync.acquire()
531
    try:
532
      sync.notifyAll()
533
    finally:
534
      sync.release()
535

    
536
    # Wait for exclusive acquire to succeed
537
    exclev.wait()
538

    
539
    self.assertEqual(self.sl._count_pending(), 0)
540

    
541
    # Try to get exclusive lock
542
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
543

    
544
    def _AcquireSharedSimple():
545
      if self.sl.acquire(shared=1, timeout=None):
546
        self.done.put("shared2")
547
        self.sl.release()
548

    
549
    for _ in xrange(10):
550
      self._addThread(target=_AcquireSharedSimple)
551

    
552
    # Tell exclusive lock to release
553
    exclsync.acquire()
554
    try:
555
      exclsync.notifyAll()
556
    finally:
557
      exclsync.release()
558

    
559
    # Wait for everything to finish
560
    self._waitThreads()
561

    
562
    self.assertEqual(self.sl._count_pending(), 0)
563

    
564
    # Check sequence
565
    for _ in xrange(3):
566
      self.assertEqual(self.done.get_nowait(), "shared")
567

    
568
    self.assertEqual(self.done.get_nowait(), "exclusive")
569

    
570
    for _ in xrange(10):
571
      self.assertEqual(self.done.get_nowait(), "shared2")
572

    
573
    self.assertRaises(Queue.Empty, self.done.get_nowait)
574

    
575

    
576
class TestSSynchronizedDecorator(_ThreadedTestCase):
577
  """Shared Lock Synchronized decorator test"""
578

    
579
  def setUp(self):
580
    _ThreadedTestCase.setUp(self)
581
    # helper threads use the 'done' queue to tell the master they finished.
582
    self.done = Queue.Queue(0)
583

    
584
  @locking.ssynchronized(_decoratorlock)
585
  def _doItExclusive(self):
586
    self.assert_(_decoratorlock._is_owned())
587
    self.done.put('EXC')
588

    
589
  @locking.ssynchronized(_decoratorlock, shared=1)
590
  def _doItSharer(self):
591
    self.assert_(_decoratorlock._is_owned(shared=1))
592
    self.done.put('SHR')
593

    
594
  def testDecoratedFunctions(self):
595
    self._doItExclusive()
596
    self.assert_(not _decoratorlock._is_owned())
597
    self._doItSharer()
598
    self.assert_(not _decoratorlock._is_owned())
599

    
600
  def testSharersCanCoexist(self):
601
    _decoratorlock.acquire(shared=1)
602
    threading.Thread(target=self._doItSharer).start()
603
    self.assert_(self.done.get(True, 1))
604
    _decoratorlock.release()
605

    
606
  @_Repeat
607
  def testExclusiveBlocksExclusive(self):
608
    _decoratorlock.acquire()
609
    self._addThread(target=self._doItExclusive)
610
    # give it a bit of time to check that it's not actually doing anything
611
    self.assertRaises(Queue.Empty, self.done.get_nowait)
612
    _decoratorlock.release()
613
    self._waitThreads()
614
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
615

    
616
  @_Repeat
617
  def testExclusiveBlocksSharer(self):
618
    _decoratorlock.acquire()
619
    self._addThread(target=self._doItSharer)
620
    self.assertRaises(Queue.Empty, self.done.get_nowait)
621
    _decoratorlock.release()
622
    self._waitThreads()
623
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
624

    
625
  @_Repeat
626
  def testSharerBlocksExclusive(self):
627
    _decoratorlock.acquire(shared=1)
628
    self._addThread(target=self._doItExclusive)
629
    self.assertRaises(Queue.Empty, self.done.get_nowait)
630
    _decoratorlock.release()
631
    self._waitThreads()
632
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
633

    
634

    
635
class TestLockSet(_ThreadedTestCase):
636
  """LockSet tests"""
637

    
638
  def setUp(self):
639
    _ThreadedTestCase.setUp(self)
640
    self._setUpLS()
641
    # helper threads use the 'done' queue to tell the master they finished.
642
    self.done = Queue.Queue(0)
643

    
644
  def _setUpLS(self):
645
    """Helper to (re)initialize the lock set"""
646
    self.resources = ['one', 'two', 'three']
647
    self.ls = locking.LockSet(members=self.resources)
648

    
649
  def testResources(self):
650
    self.assertEquals(self.ls._names(), set(self.resources))
651
    newls = locking.LockSet()
652
    self.assertEquals(newls._names(), set())
653

    
654
  def testAcquireRelease(self):
655
    self.assert_(self.ls.acquire('one'))
656
    self.assertEquals(self.ls._list_owned(), set(['one']))
657
    self.ls.release()
658
    self.assertEquals(self.ls._list_owned(), set())
659
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
660
    self.assertEquals(self.ls._list_owned(), set(['one']))
661
    self.ls.release()
662
    self.assertEquals(self.ls._list_owned(), set())
663
    self.ls.acquire(['one', 'two', 'three'])
664
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
665
    self.ls.release('one')
666
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
667
    self.ls.release(['three'])
668
    self.assertEquals(self.ls._list_owned(), set(['two']))
669
    self.ls.release()
670
    self.assertEquals(self.ls._list_owned(), set())
671
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
672
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
673
    self.ls.release()
674
    self.assertEquals(self.ls._list_owned(), set())
675

    
676
  def testNoDoubleAcquire(self):
677
    self.ls.acquire('one')
678
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
679
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
680
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
681
    self.ls.release()
682
    self.ls.acquire(['one', 'three'])
683
    self.ls.release('one')
684
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
685
    self.ls.release('three')
686

    
687
  def testNoWrongRelease(self):
688
    self.assertRaises(AssertionError, self.ls.release)
689
    self.ls.acquire('one')
690
    self.assertRaises(AssertionError, self.ls.release, 'two')
691

    
692
  def testAddRemove(self):
693
    self.ls.add('four')
694
    self.assertEquals(self.ls._list_owned(), set())
695
    self.assert_('four' in self.ls._names())
696
    self.ls.add(['five', 'six', 'seven'], acquired=1)
697
    self.assert_('five' in self.ls._names())
698
    self.assert_('six' in self.ls._names())
699
    self.assert_('seven' in self.ls._names())
700
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
701
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
702
    self.assert_('five' not in self.ls._names())
703
    self.assert_('six' not in self.ls._names())
704
    self.assertEquals(self.ls._list_owned(), set(['seven']))
705
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
706
    self.ls.remove('seven')
707
    self.assert_('seven' not in self.ls._names())
708
    self.assertEquals(self.ls._list_owned(), set([]))
709
    self.ls.acquire(None, shared=1)
710
    self.assertRaises(AssertionError, self.ls.add, 'eight')
711
    self.ls.release()
712
    self.ls.acquire(None)
713
    self.ls.add('eight', acquired=1)
714
    self.assert_('eight' in self.ls._names())
715
    self.assert_('eight' in self.ls._list_owned())
716
    self.ls.add('nine')
717
    self.assert_('nine' in self.ls._names())
718
    self.assert_('nine' not in self.ls._list_owned())
719
    self.ls.release()
720
    self.ls.remove(['two'])
721
    self.assert_('two' not in self.ls._names())
722
    self.ls.acquire('three')
723
    self.assertEquals(self.ls.remove(['three']), ['three'])
724
    self.assert_('three' not in self.ls._names())
725
    self.assertEquals(self.ls.remove('three'), [])
726
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
727
    self.assert_('one' not in self.ls._names())
728

    
729
  def testRemoveNonBlocking(self):
730
    self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
731
    self.ls.acquire('one')
732
    self.assertEquals(self.ls.remove('one', blocking=0), ['one'])
733
    self.ls.acquire(['two', 'three'])
734
    self.assertEquals(self.ls.remove(['two', 'three'], blocking=0),
735
                      ['two', 'three'])
736

    
737
  def testNoDoubleAdd(self):
738
    self.assertRaises(errors.LockError, self.ls.add, 'two')
739
    self.ls.add('four')
740
    self.assertRaises(errors.LockError, self.ls.add, 'four')
741

    
742
  def testNoWrongRemoves(self):
743
    self.ls.acquire(['one', 'three'], shared=1)
744
    # Cannot remove 'two' while holding something which is not a superset
745
    self.assertRaises(AssertionError, self.ls.remove, 'two')
746
    # Cannot remove 'three' as we are sharing it
747
    self.assertRaises(AssertionError, self.ls.remove, 'three')
748

    
749
  def testAcquireSetLock(self):
750
    # acquire the set-lock exclusively
751
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
752
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
753
    self.assertEquals(self.ls._is_owned(), True)
754
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
755
    # I can still add/remove elements...
756
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
757
    self.assert_(self.ls.add('six'))
758
    self.ls.release()
759
    # share the set-lock
760
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
761
    # adding new elements is not possible
762
    self.assertRaises(AssertionError, self.ls.add, 'five')
763
    self.ls.release()
764

    
765
  def testAcquireWithRepetitions(self):
766
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
767
                      set(['two', 'two', 'three']))
768
    self.ls.release(['two', 'two'])
769
    self.assertEquals(self.ls._list_owned(), set(['three']))
770

    
771
  def testEmptyAcquire(self):
772
    # Acquire an empty list of locks...
773
    self.assertEquals(self.ls.acquire([]), set())
774
    self.assertEquals(self.ls._list_owned(), set())
775
    # New locks can still be addded
776
    self.assert_(self.ls.add('six'))
777
    # "re-acquiring" is not an issue, since we had really acquired nothing
778
    self.assertEquals(self.ls.acquire([], shared=1), set())
779
    self.assertEquals(self.ls._list_owned(), set())
780
    # We haven't really acquired anything, so we cannot release
781
    self.assertRaises(AssertionError, self.ls.release)
782

    
783
  def _doLockSet(self, names, shared):
784
    try:
785
      self.ls.acquire(names, shared=shared)
786
      self.done.put('DONE')
787
      self.ls.release()
788
    except errors.LockError:
789
      self.done.put('ERR')
790

    
791
  def _doAddSet(self, names):
792
    try:
793
      self.ls.add(names, acquired=1)
794
      self.done.put('DONE')
795
      self.ls.release()
796
    except errors.LockError:
797
      self.done.put('ERR')
798

    
799
  def _doRemoveSet(self, names):
800
    self.done.put(self.ls.remove(names))
801

    
802
  @_Repeat
803
  def testConcurrentSharedAcquire(self):
804
    self.ls.acquire(['one', 'two'], shared=1)
805
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
806
    self._waitThreads()
807
    self.assertEqual(self.done.get_nowait(), 'DONE')
808
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
809
    self._waitThreads()
810
    self.assertEqual(self.done.get_nowait(), 'DONE')
811
    self._addThread(target=self._doLockSet, args=('three', 1))
812
    self._waitThreads()
813
    self.assertEqual(self.done.get_nowait(), 'DONE')
814
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
815
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
816
    self.assertRaises(Queue.Empty, self.done.get_nowait)
817
    self.ls.release()
818
    self._waitThreads()
819
    self.assertEqual(self.done.get_nowait(), 'DONE')
820
    self.assertEqual(self.done.get_nowait(), 'DONE')
821

    
822
  @_Repeat
823
  def testConcurrentExclusiveAcquire(self):
824
    self.ls.acquire(['one', 'two'])
825
    self._addThread(target=self._doLockSet, args=('three', 1))
826
    self._waitThreads()
827
    self.assertEqual(self.done.get_nowait(), 'DONE')
828
    self._addThread(target=self._doLockSet, args=('three', 0))
829
    self._waitThreads()
830
    self.assertEqual(self.done.get_nowait(), 'DONE')
831
    self.assertRaises(Queue.Empty, self.done.get_nowait)
832
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
833
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
834
    self._addThread(target=self._doLockSet, args=('one', 0))
835
    self._addThread(target=self._doLockSet, args=('one', 1))
836
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
837
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
838
    self.assertRaises(Queue.Empty, self.done.get_nowait)
839
    self.ls.release()
840
    self._waitThreads()
841
    for _ in range(6):
842
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
843

    
844
  @_Repeat
845
  def testConcurrentRemove(self):
846
    self.ls.add('four')
847
    self.ls.acquire(['one', 'two', 'four'])
848
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
849
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
850
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
851
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
852
    self.assertRaises(Queue.Empty, self.done.get_nowait)
853
    self.ls.remove('one')
854
    self.ls.release()
855
    self._waitThreads()
856
    for i in range(4):
857
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
858
    self.ls.add(['five', 'six'], acquired=1)
859
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
860
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
861
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
862
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
863
    self.ls.remove('five')
864
    self.ls.release()
865
    self._waitThreads()
866
    for i in range(4):
867
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
868
    self.ls.acquire(['three', 'four'])
869
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
870
    self.assertRaises(Queue.Empty, self.done.get_nowait)
871
    self.ls.remove('four')
872
    self._waitThreads()
873
    self.assertEqual(self.done.get_nowait(), ['six'])
874
    self._addThread(target=self._doRemoveSet, args=(['two']))
875
    self._waitThreads()
876
    self.assertEqual(self.done.get_nowait(), ['two'])
877
    self.ls.release()
878
    # reset lockset
879
    self._setUpLS()
880

    
881
  @_Repeat
882
  def testConcurrentSharedSetLock(self):
883
    # share the set-lock...
884
    self.ls.acquire(None, shared=1)
885
    # ...another thread can share it too
886
    self._addThread(target=self._doLockSet, args=(None, 1))
887
    self._waitThreads()
888
    self.assertEqual(self.done.get_nowait(), 'DONE')
889
    # ...or just share some elements
890
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
891
    self._waitThreads()
892
    self.assertEqual(self.done.get_nowait(), 'DONE')
893
    # ...but not add new ones or remove any
894
    t = self._addThread(target=self._doAddSet, args=(['nine']))
895
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
896
    self.assertRaises(Queue.Empty, self.done.get_nowait)
897
    # this just releases the set-lock
898
    self.ls.release([])
899
    t.join(60)
900
    self.assertEqual(self.done.get_nowait(), 'DONE')
901
    # release the lock on the actual elements so remove() can proceed too
902
    self.ls.release()
903
    self._waitThreads()
904
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
905
    # reset lockset
906
    self._setUpLS()
907

    
908
  @_Repeat
909
  def testConcurrentExclusiveSetLock(self):
910
    # acquire the set-lock...
911
    self.ls.acquire(None, shared=0)
912
    # ...no one can do anything else
913
    self._addThread(target=self._doLockSet, args=(None, 1))
914
    self._addThread(target=self._doLockSet, args=(None, 0))
915
    self._addThread(target=self._doLockSet, args=(['three'], 0))
916
    self._addThread(target=self._doLockSet, args=(['two'], 1))
917
    self._addThread(target=self._doAddSet, args=(['nine']))
918
    self.assertRaises(Queue.Empty, self.done.get_nowait)
919
    self.ls.release()
920
    self._waitThreads()
921
    for _ in range(5):
922
      self.assertEqual(self.done.get(True, 1), 'DONE')
923
    # cleanup
924
    self._setUpLS()
925

    
926
  @_Repeat
927
  def testConcurrentSetLockAdd(self):
928
    self.ls.acquire('one')
929
    # Another thread wants the whole SetLock
930
    self._addThread(target=self._doLockSet, args=(None, 0))
931
    self._addThread(target=self._doLockSet, args=(None, 1))
932
    self.assertRaises(Queue.Empty, self.done.get_nowait)
933
    self.assertRaises(AssertionError, self.ls.add, 'four')
934
    self.ls.release()
935
    self._waitThreads()
936
    self.assertEqual(self.done.get_nowait(), 'DONE')
937
    self.assertEqual(self.done.get_nowait(), 'DONE')
938
    self.ls.acquire(None)
939
    self._addThread(target=self._doLockSet, args=(None, 0))
940
    self._addThread(target=self._doLockSet, args=(None, 1))
941
    self.assertRaises(Queue.Empty, self.done.get_nowait)
942
    self.ls.add('four')
943
    self.ls.add('five', acquired=1)
944
    self.ls.add('six', acquired=1, shared=1)
945
    self.assertEquals(self.ls._list_owned(),
946
      set(['one', 'two', 'three', 'five', 'six']))
947
    self.assertEquals(self.ls._is_owned(), True)
948
    self.assertEquals(self.ls._names(),
949
      set(['one', 'two', 'three', 'four', 'five', 'six']))
950
    self.ls.release()
951
    self._waitThreads()
952
    self.assertEqual(self.done.get_nowait(), 'DONE')
953
    self.assertEqual(self.done.get_nowait(), 'DONE')
954
    self._setUpLS()
955

    
956
  @_Repeat
957
  def testEmptyLockSet(self):
958
    # get the set-lock
959
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
960
    # now empty it...
961
    self.ls.remove(['one', 'two', 'three'])
962
    # and adds/locks by another thread still wait
963
    self._addThread(target=self._doAddSet, args=(['nine']))
964
    self._addThread(target=self._doLockSet, args=(None, 1))
965
    self._addThread(target=self._doLockSet, args=(None, 0))
966
    self.assertRaises(Queue.Empty, self.done.get_nowait)
967
    self.ls.release()
968
    self._waitThreads()
969
    for _ in range(3):
970
      self.assertEqual(self.done.get_nowait(), 'DONE')
971
    # empty it again...
972
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
973
    # now share it...
974
    self.assertEqual(self.ls.acquire(None, shared=1), set())
975
    # other sharers can go, adds still wait
976
    self._addThread(target=self._doLockSet, args=(None, 1))
977
    self._waitThreads()
978
    self.assertEqual(self.done.get_nowait(), 'DONE')
979
    self._addThread(target=self._doAddSet, args=(['nine']))
980
    self.assertRaises(Queue.Empty, self.done.get_nowait)
981
    self.ls.release()
982
    self._waitThreads()
983
    self.assertEqual(self.done.get_nowait(), 'DONE')
984
    self._setUpLS()
985

    
986

    
987
class TestGanetiLockManager(_ThreadedTestCase):
988

    
989
  def setUp(self):
990
    _ThreadedTestCase.setUp(self)
991
    self.nodes=['n1', 'n2']
992
    self.instances=['i1', 'i2', 'i3']
993
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
994
                                        instances=self.instances)
995
    self.done = Queue.Queue(0)
996

    
997
  def tearDown(self):
998
    # Don't try this at home...
999
    locking.GanetiLockManager._instance = None
1000

    
1001
  def testLockingConstants(self):
1002
    # The locking library internally cheats by assuming its constants have some
1003
    # relationships with each other. Check those hold true.
1004
    # This relationship is also used in the Processor to recursively acquire
1005
    # the right locks. Again, please don't break it.
1006
    for i in range(len(locking.LEVELS)):
1007
      self.assertEqual(i, locking.LEVELS[i])
1008

    
1009
  def testDoubleGLFails(self):
1010
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1011

    
1012
  def testLockNames(self):
1013
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1014
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1015
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1016
                     set(self.instances))
1017

    
1018
  def testInitAndResources(self):
1019
    locking.GanetiLockManager._instance = None
1020
    self.GL = locking.GanetiLockManager()
1021
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1022
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1023
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1024

    
1025
    locking.GanetiLockManager._instance = None
1026
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1027
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1028
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1029
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1030

    
1031
    locking.GanetiLockManager._instance = None
1032
    self.GL = locking.GanetiLockManager(instances=self.instances)
1033
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1034
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1035
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1036
                     set(self.instances))
1037

    
1038
  def testAcquireRelease(self):
1039
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1040
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1041
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1042
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1043
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1044
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1045
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1046
    self.GL.release(locking.LEVEL_NODE)
1047
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1048
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1049
    self.GL.release(locking.LEVEL_INSTANCE)
1050
    self.assertRaises(errors.LockError, self.GL.acquire,
1051
                      locking.LEVEL_INSTANCE, ['i5'])
1052
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1053
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1054

    
1055
  def testAcquireWholeSets(self):
1056
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1057
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1058
                      set(self.instances))
1059
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1060
                      set(self.instances))
1061
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1062
                      set(self.nodes))
1063
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1064
                      set(self.nodes))
1065
    self.GL.release(locking.LEVEL_NODE)
1066
    self.GL.release(locking.LEVEL_INSTANCE)
1067
    self.GL.release(locking.LEVEL_CLUSTER)
1068

    
1069
  def testAcquireWholeAndPartial(self):
1070
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1071
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1072
                      set(self.instances))
1073
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1074
                      set(self.instances))
1075
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1076
                      set(['n2']))
1077
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1078
                      set(['n2']))
1079
    self.GL.release(locking.LEVEL_NODE)
1080
    self.GL.release(locking.LEVEL_INSTANCE)
1081
    self.GL.release(locking.LEVEL_CLUSTER)
1082

    
1083
  def testBGLDependency(self):
1084
    self.assertRaises(AssertionError, self.GL.acquire,
1085
                      locking.LEVEL_NODE, ['n1', 'n2'])
1086
    self.assertRaises(AssertionError, self.GL.acquire,
1087
                      locking.LEVEL_INSTANCE, ['i3'])
1088
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1089
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1090
    self.assertRaises(AssertionError, self.GL.release,
1091
                      locking.LEVEL_CLUSTER, ['BGL'])
1092
    self.assertRaises(AssertionError, self.GL.release,
1093
                      locking.LEVEL_CLUSTER)
1094
    self.GL.release(locking.LEVEL_NODE)
1095
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1096
    self.assertRaises(AssertionError, self.GL.release,
1097
                      locking.LEVEL_CLUSTER, ['BGL'])
1098
    self.assertRaises(AssertionError, self.GL.release,
1099
                      locking.LEVEL_CLUSTER)
1100
    self.GL.release(locking.LEVEL_INSTANCE)
1101

    
1102
  def testWrongOrder(self):
1103
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1104
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1105
    self.assertRaises(AssertionError, self.GL.acquire,
1106
                      locking.LEVEL_NODE, ['n1'])
1107
    self.assertRaises(AssertionError, self.GL.acquire,
1108
                      locking.LEVEL_INSTANCE, ['i2'])
1109

    
1110
  # Helper function to run as a thread that shared the BGL and then acquires
1111
  # some locks at another level.
1112
  def _doLock(self, level, names, shared):
1113
    try:
1114
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1115
      self.GL.acquire(level, names, shared=shared)
1116
      self.done.put('DONE')
1117
      self.GL.release(level)
1118
      self.GL.release(locking.LEVEL_CLUSTER)
1119
    except errors.LockError:
1120
      self.done.put('ERR')
1121

    
1122
  @_Repeat
1123
  def testConcurrency(self):
1124
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1125
    self._addThread(target=self._doLock,
1126
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1127
    self._waitThreads()
1128
    self.assertEqual(self.done.get_nowait(), 'DONE')
1129
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1130
    self._addThread(target=self._doLock,
1131
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1132
    self._waitThreads()
1133
    self.assertEqual(self.done.get_nowait(), 'DONE')
1134
    self._addThread(target=self._doLock,
1135
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1136
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1137
    self.GL.release(locking.LEVEL_INSTANCE)
1138
    self._waitThreads()
1139
    self.assertEqual(self.done.get_nowait(), 'DONE')
1140
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1141
    self._addThread(target=self._doLock,
1142
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1143
    self._waitThreads()
1144
    self.assertEqual(self.done.get_nowait(), 'DONE')
1145
    self._addThread(target=self._doLock,
1146
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1147
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1148
    self.GL.release(locking.LEVEL_INSTANCE)
1149
    self._waitThreads()
1150
    self.assertEqual(self.done.get(True, 1), 'DONE')
1151
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1152

    
1153

    
1154
if __name__ == '__main__':
1155
  unittest.main()
1156
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1157
  #unittest.TextTestRunner(verbosity=2).run(suite)