Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ c5fe2a67

History | View | Annotate | Download (43.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
class _ThreadedTestCase(unittest.TestCase):
52
  """Test class that supports adding/waiting on threads"""
53
  def setUp(self):
54
    unittest.TestCase.setUp(self)
55
    self.done = Queue.Queue(0)
56
    self.threads = []
57

    
58
  def _addThread(self, *args, **kwargs):
59
    """Create and remember a new thread"""
60
    t = threading.Thread(*args, **kwargs)
61
    self.threads.append(t)
62
    t.start()
63
    return t
64

    
65
  def _waitThreads(self):
66
    """Wait for all our threads to finish"""
67
    for t in self.threads:
68
      t.join(60)
69
      self.failIf(t.isAlive())
70
    self.threads = []
71

    
72

    
73
class _ConditionTestCase(_ThreadedTestCase):
74
  """Common test case for conditions"""
75

    
76
  def setUp(self, cls):
77
    _ThreadedTestCase.setUp(self)
78
    self.lock = threading.Lock()
79
    self.cond = cls(self.lock)
80

    
81
  def _testAcquireRelease(self):
82
    self.assert_(not self.cond._is_owned())
83
    self.assertRaises(RuntimeError, self.cond.wait)
84
    self.assertRaises(RuntimeError, self.cond.notifyAll)
85

    
86
    self.cond.acquire()
87
    self.assert_(self.cond._is_owned())
88
    self.cond.notifyAll()
89
    self.assert_(self.cond._is_owned())
90
    self.cond.release()
91

    
92
    self.assert_(not self.cond._is_owned())
93
    self.assertRaises(RuntimeError, self.cond.wait)
94
    self.assertRaises(RuntimeError, self.cond.notifyAll)
95

    
96
  def _testNotification(self):
97
    def _NotifyAll():
98
      self.cond.acquire()
99
      self.cond.notifyAll()
100
      self.cond.release()
101

    
102
    self.cond.acquire()
103
    self._addThread(target=_NotifyAll)
104
    self.cond.wait()
105
    self.assert_(self.cond._is_owned())
106
    self.cond.release()
107
    self.assert_(not self.cond._is_owned())
108

    
109

    
110
class TestPipeCondition(_ConditionTestCase):
111
  """_PipeCondition tests"""
112

    
113
  def setUp(self):
114
    _ConditionTestCase.setUp(self, locking._PipeCondition)
115

    
116
  def testAcquireRelease(self):
117
    self._testAcquireRelease()
118

    
119
  def testNotification(self):
120
    self._testNotification()
121

    
122
  def _TestWait(self, fn):
123
    self._addThread(target=fn)
124
    self._addThread(target=fn)
125
    self._addThread(target=fn)
126

    
127
    # Wait for threads to be waiting
128
    self.assertEqual(self.done.get(True, 1), "A")
129
    self.assertEqual(self.done.get(True, 1), "A")
130
    self.assertEqual(self.done.get(True, 1), "A")
131

    
132
    self.assertRaises(Queue.Empty, self.done.get_nowait)
133

    
134
    self.cond.acquire()
135
    self.assertEqual(self.cond._nwaiters, 3)
136
    # This new thread can"t acquire the lock, and thus call wait, before we
137
    # release it
138
    self._addThread(target=fn)
139
    self.cond.notifyAll()
140
    self.assertRaises(Queue.Empty, self.done.get_nowait)
141
    self.cond.release()
142

    
143
    # We should now get 3 W and 1 A (for the new thread) in whatever order
144
    w = 0
145
    a = 0
146
    for i in range(4):
147
      got = self.done.get(True, 1)
148
      if got == "W":
149
        w += 1
150
      elif got == "A":
151
        a += 1
152
      else:
153
        self.fail("Got %s on the done queue" % got)
154

    
155
    self.assertEqual(w, 3)
156
    self.assertEqual(a, 1)
157

    
158
    self.cond.acquire()
159
    self.cond.notifyAll()
160
    self.cond.release()
161
    self._waitThreads()
162
    self.assertEqual(self.done.get_nowait(), "W")
163
    self.assertRaises(Queue.Empty, self.done.get_nowait)
164

    
165
  def testBlockingWait(self):
166
    def _BlockingWait():
167
      self.cond.acquire()
168
      self.done.put("A")
169
      self.cond.wait()
170
      self.cond.release()
171
      self.done.put("W")
172

    
173
    self._TestWait(_BlockingWait)
174

    
175
  def testLongTimeoutWait(self):
176
    def _Helper():
177
      self.cond.acquire()
178
      self.done.put("A")
179
      self.cond.wait(15.0)
180
      self.cond.release()
181
      self.done.put("W")
182

    
183
    self._TestWait(_Helper)
184

    
185
  def _TimeoutWait(self, timeout, check):
186
    self.cond.acquire()
187
    self.cond.wait(timeout)
188
    self.cond.release()
189
    self.done.put(check)
190

    
191
  def testShortTimeoutWait(self):
192
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
193
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
194
    self._waitThreads()
195
    self.assertEqual(self.done.get_nowait(), "T1")
196
    self.assertEqual(self.done.get_nowait(), "T1")
197
    self.assertRaises(Queue.Empty, self.done.get_nowait)
198

    
199
  def testZeroTimeoutWait(self):
200
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
201
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
202
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
203
    self._waitThreads()
204
    self.assertEqual(self.done.get_nowait(), "T0")
205
    self.assertEqual(self.done.get_nowait(), "T0")
206
    self.assertEqual(self.done.get_nowait(), "T0")
207
    self.assertRaises(Queue.Empty, self.done.get_nowait)
208

    
209

    
210
class TestSingleActionPipeCondition(unittest.TestCase):
211
  """_SingleActionPipeCondition tests"""
212

    
213
  def setUp(self):
214
    self.cond = locking._SingleActionPipeCondition()
215

    
216
  def testInitialization(self):
217
    self.assert_(self.cond._read_fd is not None)
218
    self.assert_(self.cond._write_fd is not None)
219
    self.assert_(self.cond._poller is not None)
220
    self.assertEqual(self.cond._nwaiters, 0)
221

    
222
  def testUsageCount(self):
223
    self.cond.StartWaiting()
224
    self.assert_(self.cond._read_fd is not None)
225
    self.assert_(self.cond._write_fd is not None)
226
    self.assert_(self.cond._poller is not None)
227
    self.assertEqual(self.cond._nwaiters, 1)
228

    
229
    # use again
230
    self.cond.StartWaiting()
231
    self.assertEqual(self.cond._nwaiters, 2)
232

    
233
    # there is more than one user
234
    self.assert_(not self.cond.DoneWaiting())
235
    self.assert_(self.cond._read_fd is not None)
236
    self.assert_(self.cond._write_fd is not None)
237
    self.assert_(self.cond._poller is not None)
238
    self.assertEqual(self.cond._nwaiters, 1)
239

    
240
    self.assert_(self.cond.DoneWaiting())
241
    self.assertEqual(self.cond._nwaiters, 0)
242
    self.assert_(self.cond._read_fd is None)
243
    self.assert_(self.cond._write_fd is None)
244
    self.assert_(self.cond._poller is None)
245

    
246
  def testNotify(self):
247
    wait1 = self.cond.StartWaiting()
248
    wait2 = self.cond.StartWaiting()
249

    
250
    self.assert_(self.cond._read_fd is not None)
251
    self.assert_(self.cond._write_fd is not None)
252
    self.assert_(self.cond._poller is not None)
253

    
254
    self.cond.notifyAll()
255

    
256
    self.assert_(self.cond._read_fd is not None)
257
    self.assert_(self.cond._write_fd is None)
258
    self.assert_(self.cond._poller is not None)
259

    
260
    self.assert_(not self.cond.DoneWaiting())
261

    
262
    self.assert_(self.cond._read_fd is not None)
263
    self.assert_(self.cond._write_fd is None)
264
    self.assert_(self.cond._poller is not None)
265

    
266
    self.assert_(self.cond.DoneWaiting())
267

    
268
    self.assert_(self.cond._read_fd is None)
269
    self.assert_(self.cond._write_fd is None)
270
    self.assert_(self.cond._poller is None)
271

    
272
  def testReusage(self):
273
    self.cond.StartWaiting()
274
    self.assert_(self.cond._read_fd is not None)
275
    self.assert_(self.cond._write_fd is not None)
276
    self.assert_(self.cond._poller is not None)
277

    
278
    self.assert_(self.cond.DoneWaiting())
279

    
280
    self.assertRaises(RuntimeError, self.cond.StartWaiting)
281
    self.assert_(self.cond._read_fd is None)
282
    self.assert_(self.cond._write_fd is None)
283
    self.assert_(self.cond._poller is None)
284

    
285
  def testNotifyTwice(self):
286
    self.cond.notifyAll()
287
    self.assertRaises(RuntimeError, self.cond.notifyAll)
288

    
289

    
290
class TestSharedLock(_ThreadedTestCase):
291
  """SharedLock tests"""
292

    
293
  def setUp(self):
294
    _ThreadedTestCase.setUp(self)
295
    self.sl = locking.SharedLock()
296

    
297
  def testSequenceAndOwnership(self):
298
    self.assert_(not self.sl._is_owned())
299
    self.sl.acquire(shared=1)
300
    self.assert_(self.sl._is_owned())
301
    self.assert_(self.sl._is_owned(shared=1))
302
    self.assert_(not self.sl._is_owned(shared=0))
303
    self.sl.release()
304
    self.assert_(not self.sl._is_owned())
305
    self.sl.acquire()
306
    self.assert_(self.sl._is_owned())
307
    self.assert_(not self.sl._is_owned(shared=1))
308
    self.assert_(self.sl._is_owned(shared=0))
309
    self.sl.release()
310
    self.assert_(not self.sl._is_owned())
311
    self.sl.acquire(shared=1)
312
    self.assert_(self.sl._is_owned())
313
    self.assert_(self.sl._is_owned(shared=1))
314
    self.assert_(not self.sl._is_owned(shared=0))
315
    self.sl.release()
316
    self.assert_(not self.sl._is_owned())
317

    
318
  def testBooleanValue(self):
319
    # semaphores are supposed to return a true value on a successful acquire
320
    self.assert_(self.sl.acquire(shared=1))
321
    self.sl.release()
322
    self.assert_(self.sl.acquire())
323
    self.sl.release()
324

    
325
  def testDoubleLockingStoE(self):
326
    self.sl.acquire(shared=1)
327
    self.assertRaises(AssertionError, self.sl.acquire)
328

    
329
  def testDoubleLockingEtoS(self):
330
    self.sl.acquire()
331
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
332

    
333
  def testDoubleLockingStoS(self):
334
    self.sl.acquire(shared=1)
335
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
336

    
337
  def testDoubleLockingEtoE(self):
338
    self.sl.acquire()
339
    self.assertRaises(AssertionError, self.sl.acquire)
340

    
341
  # helper functions: called in a separate thread they acquire the lock, send
342
  # their identifier on the done queue, then release it.
343
  def _doItSharer(self):
344
    try:
345
      self.sl.acquire(shared=1)
346
      self.done.put('SHR')
347
      self.sl.release()
348
    except errors.LockError:
349
      self.done.put('ERR')
350

    
351
  def _doItExclusive(self):
352
    try:
353
      self.sl.acquire()
354
      self.done.put('EXC')
355
      self.sl.release()
356
    except errors.LockError:
357
      self.done.put('ERR')
358

    
359
  def _doItDelete(self):
360
    try:
361
      self.sl.delete()
362
      self.done.put('DEL')
363
    except errors.LockError:
364
      self.done.put('ERR')
365

    
366
  def testSharersCanCoexist(self):
367
    self.sl.acquire(shared=1)
368
    threading.Thread(target=self._doItSharer).start()
369
    self.assert_(self.done.get(True, 1))
370
    self.sl.release()
371

    
372
  @_Repeat
373
  def testExclusiveBlocksExclusive(self):
374
    self.sl.acquire()
375
    self._addThread(target=self._doItExclusive)
376
    self.assertRaises(Queue.Empty, self.done.get_nowait)
377
    self.sl.release()
378
    self._waitThreads()
379
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
380

    
381
  @_Repeat
382
  def testExclusiveBlocksDelete(self):
383
    self.sl.acquire()
384
    self._addThread(target=self._doItDelete)
385
    self.assertRaises(Queue.Empty, self.done.get_nowait)
386
    self.sl.release()
387
    self._waitThreads()
388
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
389
    self.sl = locking.SharedLock()
390

    
391
  @_Repeat
392
  def testExclusiveBlocksSharer(self):
393
    self.sl.acquire()
394
    self._addThread(target=self._doItSharer)
395
    self.assertRaises(Queue.Empty, self.done.get_nowait)
396
    self.sl.release()
397
    self._waitThreads()
398
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
399

    
400
  @_Repeat
401
  def testSharerBlocksExclusive(self):
402
    self.sl.acquire(shared=1)
403
    self._addThread(target=self._doItExclusive)
404
    self.assertRaises(Queue.Empty, self.done.get_nowait)
405
    self.sl.release()
406
    self._waitThreads()
407
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
408

    
409
  @_Repeat
410
  def testSharerBlocksDelete(self):
411
    self.sl.acquire(shared=1)
412
    self._addThread(target=self._doItDelete)
413
    self.assertRaises(Queue.Empty, self.done.get_nowait)
414
    self.sl.release()
415
    self._waitThreads()
416
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
417
    self.sl = locking.SharedLock()
418

    
419
  @_Repeat
420
  def testWaitingExclusiveBlocksSharer(self):
421
    """SKIPPED testWaitingExclusiveBlockSharer"""
422
    return
423

    
424
    self.sl.acquire(shared=1)
425
    # the lock is acquired in shared mode...
426
    self._addThread(target=self._doItExclusive)
427
    # ...but now an exclusive is waiting...
428
    self._addThread(target=self._doItSharer)
429
    # ...so the sharer should be blocked as well
430
    self.assertRaises(Queue.Empty, self.done.get_nowait)
431
    self.sl.release()
432
    self._waitThreads()
433
    # The exclusive passed before
434
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
435
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
436

    
437
  @_Repeat
438
  def testWaitingSharerBlocksExclusive(self):
439
    """SKIPPED testWaitingSharerBlocksExclusive"""
440
    return
441

    
442
    self.sl.acquire()
443
    # the lock is acquired in exclusive mode...
444
    self._addThread(target=self._doItSharer)
445
    # ...but now a sharer is waiting...
446
    self._addThread(target=self._doItExclusive)
447
    # ...the exclusive is waiting too...
448
    self.assertRaises(Queue.Empty, self.done.get_nowait)
449
    self.sl.release()
450
    self._waitThreads()
451
    # The sharer passed before
452
    self.assertEqual(self.done.get_nowait(), 'SHR')
453
    self.assertEqual(self.done.get_nowait(), 'EXC')
454

    
455
  def testDelete(self):
456
    self.sl.delete()
457
    self.assertRaises(errors.LockError, self.sl.acquire)
458
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
459
    self.assertRaises(errors.LockError, self.sl.delete)
460

    
461
  def testDeleteTimeout(self):
462
    self.sl.delete(timeout=60)
463

    
464
  def testNoDeleteIfSharer(self):
465
    self.sl.acquire(shared=1)
466
    self.assertRaises(AssertionError, self.sl.delete)
467

    
468
  @_Repeat
469
  def testDeletePendingSharersExclusiveDelete(self):
470
    self.sl.acquire()
471
    self._addThread(target=self._doItSharer)
472
    self._addThread(target=self._doItSharer)
473
    self._addThread(target=self._doItExclusive)
474
    self._addThread(target=self._doItDelete)
475
    self.sl.delete()
476
    self._waitThreads()
477
    # The threads who were pending return ERR
478
    for _ in range(4):
479
      self.assertEqual(self.done.get_nowait(), 'ERR')
480
    self.sl = locking.SharedLock()
481

    
482
  @_Repeat
483
  def testDeletePendingDeleteExclusiveSharers(self):
484
    self.sl.acquire()
485
    self._addThread(target=self._doItDelete)
486
    self._addThread(target=self._doItExclusive)
487
    self._addThread(target=self._doItSharer)
488
    self._addThread(target=self._doItSharer)
489
    self.sl.delete()
490
    self._waitThreads()
491
    # The two threads who were pending return both ERR
492
    self.assertEqual(self.done.get_nowait(), 'ERR')
493
    self.assertEqual(self.done.get_nowait(), 'ERR')
494
    self.assertEqual(self.done.get_nowait(), 'ERR')
495
    self.assertEqual(self.done.get_nowait(), 'ERR')
496
    self.sl = locking.SharedLock()
497

    
498
  @_Repeat
499
  def testExclusiveAcquireTimeout(self):
500
    def _LockExclusive(wait):
501
      self.sl.acquire(shared=0)
502
      self.done.put("A: start sleep")
503
      time.sleep(wait)
504
      self.done.put("A: end sleep")
505
      self.sl.release()
506

    
507
    for shared in [0, 1]:
508
      # Start thread to hold lock for 20 ms
509
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
510

    
511
      # Wait for sleep to begin
512
      self.assertEqual(self.done.get(), "A: start sleep")
513

    
514
      # Wait up to 100 ms to get lock
515
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
516
      self.done.put("got 2nd")
517
      self.sl.release()
518

    
519
      self._waitThreads()
520

    
521
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
522
      self.assertEqual(self.done.get_nowait(), "got 2nd")
523
      self.assertRaises(Queue.Empty, self.done.get_nowait)
524

    
525
  @_Repeat
526
  def testAcquireExpiringTimeout(self):
527
    def _AcquireWithTimeout(shared, timeout):
528
      if not self.sl.acquire(shared=shared, timeout=timeout):
529
        self.done.put("timeout")
530

    
531
    for shared in [0, 1]:
532
      # Lock exclusively
533
      self.sl.acquire()
534

    
535
      # Start shared acquires with timeout between 0 and 20 ms
536
      for i in xrange(11):
537
        self._addThread(target=_AcquireWithTimeout,
538
                        args=(shared, i * 2.0 / 1000.0))
539

    
540
      # Wait for threads to finish (makes sure the acquire timeout expires
541
      # before releasing the lock)
542
      self._waitThreads()
543

    
544
      # Release lock
545
      self.sl.release()
546

    
547
      for _ in xrange(11):
548
        self.assertEqual(self.done.get_nowait(), "timeout")
549

    
550
      self.assertRaises(Queue.Empty, self.done.get_nowait)
551

    
552
  @_Repeat
553
  def testSharedSkipExclusiveAcquires(self):
554
    # Tests whether shared acquires jump in front of exclusive acquires in the
555
    # queue.
556

    
557
    # Get exclusive lock while we fill the queue
558
    self.sl.acquire()
559

    
560
    def _Acquire(shared, name):
561
      if not self.sl.acquire(shared=shared):
562
        return
563

    
564
      self.done.put(name)
565
      self.sl.release()
566

    
567
    # Start shared acquires
568
    for _ in xrange(5):
569
      self._addThread(target=_Acquire, args=(1, "shared A"))
570

    
571
    # Start exclusive acquires
572
    for _ in xrange(3):
573
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
574

    
575
    # More shared acquires
576
    for _ in xrange(5):
577
      self._addThread(target=_Acquire, args=(1, "shared C"))
578

    
579
    # More exclusive acquires
580
    for _ in xrange(3):
581
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
582

    
583
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
584
    # together. There's no way to wait for SharedLock.acquire to start
585
    # its work. Hence the timeout of 2 seconds.
586
    pending = 0
587
    end_time = time.time() + 2.0
588
    while time.time() < end_time:
589
      pending = self.sl._count_pending()
590
      self.assert_(pending >= 0 and pending <= 7)
591
      if pending == 7:
592
        break
593
      time.sleep(0.05)
594
    self.assertEqual(pending, 7)
595

    
596
    # Release exclusive lock and wait
597
    self.sl.release()
598

    
599
    self._waitThreads()
600

    
601
    # Check sequence
602
    shr_a = 0
603
    shr_c = 0
604
    for _ in xrange(10):
605
      # Shared locks aren't guaranteed to be notified in order, but they'll be
606
      # first
607
      tmp = self.done.get_nowait()
608
      if tmp == "shared A":
609
        shr_a += 1
610
      elif tmp == "shared C":
611
        shr_c += 1
612
    self.assertEqual(shr_a, 5)
613
    self.assertEqual(shr_c, 5)
614

    
615
    for _ in xrange(3):
616
      self.assertEqual(self.done.get_nowait(), "exclusive B")
617

    
618
    for _ in xrange(3):
619
      self.assertEqual(self.done.get_nowait(), "exclusive D")
620

    
621
    self.assertRaises(Queue.Empty, self.done.get_nowait)
622

    
623
  @_Repeat
624
  def testMixedAcquireTimeout(self):
625
    sync = threading.Condition()
626

    
627
    def _AcquireShared(ev):
628
      if not self.sl.acquire(shared=1, timeout=None):
629
        return
630

    
631
      self.done.put("shared")
632

    
633
      # Notify main thread
634
      ev.set()
635

    
636
      # Wait for notification
637
      sync.acquire()
638
      try:
639
        sync.wait()
640
      finally:
641
        sync.release()
642

    
643
      # Release lock
644
      self.sl.release()
645

    
646
    acquires = []
647
    for _ in xrange(3):
648
      ev = threading.Event()
649
      self._addThread(target=_AcquireShared, args=(ev, ))
650
      acquires.append(ev)
651

    
652
    # Wait for all acquires to finish
653
    for i in acquires:
654
      i.wait()
655

    
656
    self.assertEqual(self.sl._count_pending(), 0)
657

    
658
    # Try to get exclusive lock
659
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
660

    
661
    # Acquire exclusive without timeout
662
    exclsync = threading.Condition()
663
    exclev = threading.Event()
664

    
665
    def _AcquireExclusive():
666
      if not self.sl.acquire(shared=0):
667
        return
668

    
669
      self.done.put("exclusive")
670

    
671
      # Notify main thread
672
      exclev.set()
673

    
674
      exclsync.acquire()
675
      try:
676
        exclsync.wait()
677
      finally:
678
        exclsync.release()
679

    
680
      self.sl.release()
681

    
682
    self._addThread(target=_AcquireExclusive)
683

    
684
    # Try to get exclusive lock
685
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
686

    
687
    # Make all shared holders release their locks
688
    sync.acquire()
689
    try:
690
      sync.notifyAll()
691
    finally:
692
      sync.release()
693

    
694
    # Wait for exclusive acquire to succeed
695
    exclev.wait()
696

    
697
    self.assertEqual(self.sl._count_pending(), 0)
698

    
699
    # Try to get exclusive lock
700
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
701

    
702
    def _AcquireSharedSimple():
703
      if self.sl.acquire(shared=1, timeout=None):
704
        self.done.put("shared2")
705
        self.sl.release()
706

    
707
    for _ in xrange(10):
708
      self._addThread(target=_AcquireSharedSimple)
709

    
710
    # Tell exclusive lock to release
711
    exclsync.acquire()
712
    try:
713
      exclsync.notifyAll()
714
    finally:
715
      exclsync.release()
716

    
717
    # Wait for everything to finish
718
    self._waitThreads()
719

    
720
    self.assertEqual(self.sl._count_pending(), 0)
721

    
722
    # Check sequence
723
    for _ in xrange(3):
724
      self.assertEqual(self.done.get_nowait(), "shared")
725

    
726
    self.assertEqual(self.done.get_nowait(), "exclusive")
727

    
728
    for _ in xrange(10):
729
      self.assertEqual(self.done.get_nowait(), "shared2")
730

    
731
    self.assertRaises(Queue.Empty, self.done.get_nowait)
732

    
733

    
734
class TestSSynchronizedDecorator(_ThreadedTestCase):
735
  """Shared Lock Synchronized decorator test"""
736

    
737
  def setUp(self):
738
    _ThreadedTestCase.setUp(self)
739

    
740
  @locking.ssynchronized(_decoratorlock)
741
  def _doItExclusive(self):
742
    self.assert_(_decoratorlock._is_owned())
743
    self.done.put('EXC')
744

    
745
  @locking.ssynchronized(_decoratorlock, shared=1)
746
  def _doItSharer(self):
747
    self.assert_(_decoratorlock._is_owned(shared=1))
748
    self.done.put('SHR')
749

    
750
  def testDecoratedFunctions(self):
751
    self._doItExclusive()
752
    self.assert_(not _decoratorlock._is_owned())
753
    self._doItSharer()
754
    self.assert_(not _decoratorlock._is_owned())
755

    
756
  def testSharersCanCoexist(self):
757
    _decoratorlock.acquire(shared=1)
758
    threading.Thread(target=self._doItSharer).start()
759
    self.assert_(self.done.get(True, 1))
760
    _decoratorlock.release()
761

    
762
  @_Repeat
763
  def testExclusiveBlocksExclusive(self):
764
    _decoratorlock.acquire()
765
    self._addThread(target=self._doItExclusive)
766
    # give it a bit of time to check that it's not actually doing anything
767
    self.assertRaises(Queue.Empty, self.done.get_nowait)
768
    _decoratorlock.release()
769
    self._waitThreads()
770
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
771

    
772
  @_Repeat
773
  def testExclusiveBlocksSharer(self):
774
    _decoratorlock.acquire()
775
    self._addThread(target=self._doItSharer)
776
    self.assertRaises(Queue.Empty, self.done.get_nowait)
777
    _decoratorlock.release()
778
    self._waitThreads()
779
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
780

    
781
  @_Repeat
782
  def testSharerBlocksExclusive(self):
783
    _decoratorlock.acquire(shared=1)
784
    self._addThread(target=self._doItExclusive)
785
    self.assertRaises(Queue.Empty, self.done.get_nowait)
786
    _decoratorlock.release()
787
    self._waitThreads()
788
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
789

    
790

    
791
class TestLockSet(_ThreadedTestCase):
792
  """LockSet tests"""
793

    
794
  def setUp(self):
795
    _ThreadedTestCase.setUp(self)
796
    self._setUpLS()
797

    
798
  def _setUpLS(self):
799
    """Helper to (re)initialize the lock set"""
800
    self.resources = ['one', 'two', 'three']
801
    self.ls = locking.LockSet(members=self.resources)
802

    
803
  def testResources(self):
804
    self.assertEquals(self.ls._names(), set(self.resources))
805
    newls = locking.LockSet()
806
    self.assertEquals(newls._names(), set())
807

    
808
  def testAcquireRelease(self):
809
    self.assert_(self.ls.acquire('one'))
810
    self.assertEquals(self.ls._list_owned(), set(['one']))
811
    self.ls.release()
812
    self.assertEquals(self.ls._list_owned(), set())
813
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
814
    self.assertEquals(self.ls._list_owned(), set(['one']))
815
    self.ls.release()
816
    self.assertEquals(self.ls._list_owned(), set())
817
    self.ls.acquire(['one', 'two', 'three'])
818
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
819
    self.ls.release('one')
820
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
821
    self.ls.release(['three'])
822
    self.assertEquals(self.ls._list_owned(), set(['two']))
823
    self.ls.release()
824
    self.assertEquals(self.ls._list_owned(), set())
825
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
826
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
827
    self.ls.release()
828
    self.assertEquals(self.ls._list_owned(), set())
829

    
830
  def testNoDoubleAcquire(self):
831
    self.ls.acquire('one')
832
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
833
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
834
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
835
    self.ls.release()
836
    self.ls.acquire(['one', 'three'])
837
    self.ls.release('one')
838
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
839
    self.ls.release('three')
840

    
841
  def testNoWrongRelease(self):
842
    self.assertRaises(AssertionError, self.ls.release)
843
    self.ls.acquire('one')
844
    self.assertRaises(AssertionError, self.ls.release, 'two')
845

    
846
  def testAddRemove(self):
847
    self.ls.add('four')
848
    self.assertEquals(self.ls._list_owned(), set())
849
    self.assert_('four' in self.ls._names())
850
    self.ls.add(['five', 'six', 'seven'], acquired=1)
851
    self.assert_('five' in self.ls._names())
852
    self.assert_('six' in self.ls._names())
853
    self.assert_('seven' in self.ls._names())
854
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
855
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
856
    self.assert_('five' not in self.ls._names())
857
    self.assert_('six' not in self.ls._names())
858
    self.assertEquals(self.ls._list_owned(), set(['seven']))
859
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
860
    self.ls.remove('seven')
861
    self.assert_('seven' not in self.ls._names())
862
    self.assertEquals(self.ls._list_owned(), set([]))
863
    self.ls.acquire(None, shared=1)
864
    self.assertRaises(AssertionError, self.ls.add, 'eight')
865
    self.ls.release()
866
    self.ls.acquire(None)
867
    self.ls.add('eight', acquired=1)
868
    self.assert_('eight' in self.ls._names())
869
    self.assert_('eight' in self.ls._list_owned())
870
    self.ls.add('nine')
871
    self.assert_('nine' in self.ls._names())
872
    self.assert_('nine' not in self.ls._list_owned())
873
    self.ls.release()
874
    self.ls.remove(['two'])
875
    self.assert_('two' not in self.ls._names())
876
    self.ls.acquire('three')
877
    self.assertEquals(self.ls.remove(['three']), ['three'])
878
    self.assert_('three' not in self.ls._names())
879
    self.assertEquals(self.ls.remove('three'), [])
880
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
881
    self.assert_('one' not in self.ls._names())
882

    
883
  def testRemoveNonBlocking(self):
884
    self.ls.acquire('one')
885
    self.assertEquals(self.ls.remove('one'), ['one'])
886
    self.ls.acquire(['two', 'three'])
887
    self.assertEquals(self.ls.remove(['two', 'three']),
888
                      ['two', 'three'])
889

    
890
  def testNoDoubleAdd(self):
891
    self.assertRaises(errors.LockError, self.ls.add, 'two')
892
    self.ls.add('four')
893
    self.assertRaises(errors.LockError, self.ls.add, 'four')
894

    
895
  def testNoWrongRemoves(self):
896
    self.ls.acquire(['one', 'three'], shared=1)
897
    # Cannot remove 'two' while holding something which is not a superset
898
    self.assertRaises(AssertionError, self.ls.remove, 'two')
899
    # Cannot remove 'three' as we are sharing it
900
    self.assertRaises(AssertionError, self.ls.remove, 'three')
901

    
902
  def testAcquireSetLock(self):
903
    # acquire the set-lock exclusively
904
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
905
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
906
    self.assertEquals(self.ls._is_owned(), True)
907
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
908
    # I can still add/remove elements...
909
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
910
    self.assert_(self.ls.add('six'))
911
    self.ls.release()
912
    # share the set-lock
913
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
914
    # adding new elements is not possible
915
    self.assertRaises(AssertionError, self.ls.add, 'five')
916
    self.ls.release()
917

    
918
  def testAcquireWithRepetitions(self):
919
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
920
                      set(['two', 'two', 'three']))
921
    self.ls.release(['two', 'two'])
922
    self.assertEquals(self.ls._list_owned(), set(['three']))
923

    
924
  def testEmptyAcquire(self):
925
    # Acquire an empty list of locks...
926
    self.assertEquals(self.ls.acquire([]), set())
927
    self.assertEquals(self.ls._list_owned(), set())
928
    # New locks can still be addded
929
    self.assert_(self.ls.add('six'))
930
    # "re-acquiring" is not an issue, since we had really acquired nothing
931
    self.assertEquals(self.ls.acquire([], shared=1), set())
932
    self.assertEquals(self.ls._list_owned(), set())
933
    # We haven't really acquired anything, so we cannot release
934
    self.assertRaises(AssertionError, self.ls.release)
935

    
936
  def _doLockSet(self, names, shared):
937
    try:
938
      self.ls.acquire(names, shared=shared)
939
      self.done.put('DONE')
940
      self.ls.release()
941
    except errors.LockError:
942
      self.done.put('ERR')
943

    
944
  def _doAddSet(self, names):
945
    try:
946
      self.ls.add(names, acquired=1)
947
      self.done.put('DONE')
948
      self.ls.release()
949
    except errors.LockError:
950
      self.done.put('ERR')
951

    
952
  def _doRemoveSet(self, names):
953
    self.done.put(self.ls.remove(names))
954

    
955
  @_Repeat
956
  def testConcurrentSharedAcquire(self):
957
    self.ls.acquire(['one', 'two'], shared=1)
958
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
959
    self._waitThreads()
960
    self.assertEqual(self.done.get_nowait(), 'DONE')
961
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
962
    self._waitThreads()
963
    self.assertEqual(self.done.get_nowait(), 'DONE')
964
    self._addThread(target=self._doLockSet, args=('three', 1))
965
    self._waitThreads()
966
    self.assertEqual(self.done.get_nowait(), 'DONE')
967
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
968
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
969
    self.assertRaises(Queue.Empty, self.done.get_nowait)
970
    self.ls.release()
971
    self._waitThreads()
972
    self.assertEqual(self.done.get_nowait(), 'DONE')
973
    self.assertEqual(self.done.get_nowait(), 'DONE')
974

    
975
  @_Repeat
976
  def testConcurrentExclusiveAcquire(self):
977
    self.ls.acquire(['one', 'two'])
978
    self._addThread(target=self._doLockSet, args=('three', 1))
979
    self._waitThreads()
980
    self.assertEqual(self.done.get_nowait(), 'DONE')
981
    self._addThread(target=self._doLockSet, args=('three', 0))
982
    self._waitThreads()
983
    self.assertEqual(self.done.get_nowait(), 'DONE')
984
    self.assertRaises(Queue.Empty, self.done.get_nowait)
985
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
986
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
987
    self._addThread(target=self._doLockSet, args=('one', 0))
988
    self._addThread(target=self._doLockSet, args=('one', 1))
989
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
990
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
991
    self.assertRaises(Queue.Empty, self.done.get_nowait)
992
    self.ls.release()
993
    self._waitThreads()
994
    for _ in range(6):
995
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
996

    
997
  @_Repeat
998
  def testConcurrentRemove(self):
999
    self.ls.add('four')
1000
    self.ls.acquire(['one', 'two', 'four'])
1001
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1002
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1003
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1004
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1005
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1006
    self.ls.remove('one')
1007
    self.ls.release()
1008
    self._waitThreads()
1009
    for i in range(4):
1010
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1011
    self.ls.add(['five', 'six'], acquired=1)
1012
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1013
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1014
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1015
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1016
    self.ls.remove('five')
1017
    self.ls.release()
1018
    self._waitThreads()
1019
    for i in range(4):
1020
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1021
    self.ls.acquire(['three', 'four'])
1022
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1023
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1024
    self.ls.remove('four')
1025
    self._waitThreads()
1026
    self.assertEqual(self.done.get_nowait(), ['six'])
1027
    self._addThread(target=self._doRemoveSet, args=(['two']))
1028
    self._waitThreads()
1029
    self.assertEqual(self.done.get_nowait(), ['two'])
1030
    self.ls.release()
1031
    # reset lockset
1032
    self._setUpLS()
1033

    
1034
  @_Repeat
1035
  def testConcurrentSharedSetLock(self):
1036
    # share the set-lock...
1037
    self.ls.acquire(None, shared=1)
1038
    # ...another thread can share it too
1039
    self._addThread(target=self._doLockSet, args=(None, 1))
1040
    self._waitThreads()
1041
    self.assertEqual(self.done.get_nowait(), 'DONE')
1042
    # ...or just share some elements
1043
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1044
    self._waitThreads()
1045
    self.assertEqual(self.done.get_nowait(), 'DONE')
1046
    # ...but not add new ones or remove any
1047
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1048
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1049
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1050
    # this just releases the set-lock
1051
    self.ls.release([])
1052
    t.join(60)
1053
    self.assertEqual(self.done.get_nowait(), 'DONE')
1054
    # release the lock on the actual elements so remove() can proceed too
1055
    self.ls.release()
1056
    self._waitThreads()
1057
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1058
    # reset lockset
1059
    self._setUpLS()
1060

    
1061
  @_Repeat
1062
  def testConcurrentExclusiveSetLock(self):
1063
    # acquire the set-lock...
1064
    self.ls.acquire(None, shared=0)
1065
    # ...no one can do anything else
1066
    self._addThread(target=self._doLockSet, args=(None, 1))
1067
    self._addThread(target=self._doLockSet, args=(None, 0))
1068
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1069
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1070
    self._addThread(target=self._doAddSet, args=(['nine']))
1071
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1072
    self.ls.release()
1073
    self._waitThreads()
1074
    for _ in range(5):
1075
      self.assertEqual(self.done.get(True, 1), 'DONE')
1076
    # cleanup
1077
    self._setUpLS()
1078

    
1079
  @_Repeat
1080
  def testConcurrentSetLockAdd(self):
1081
    self.ls.acquire('one')
1082
    # Another thread wants the whole SetLock
1083
    self._addThread(target=self._doLockSet, args=(None, 0))
1084
    self._addThread(target=self._doLockSet, args=(None, 1))
1085
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1086
    self.assertRaises(AssertionError, self.ls.add, 'four')
1087
    self.ls.release()
1088
    self._waitThreads()
1089
    self.assertEqual(self.done.get_nowait(), 'DONE')
1090
    self.assertEqual(self.done.get_nowait(), 'DONE')
1091
    self.ls.acquire(None)
1092
    self._addThread(target=self._doLockSet, args=(None, 0))
1093
    self._addThread(target=self._doLockSet, args=(None, 1))
1094
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1095
    self.ls.add('four')
1096
    self.ls.add('five', acquired=1)
1097
    self.ls.add('six', acquired=1, shared=1)
1098
    self.assertEquals(self.ls._list_owned(),
1099
      set(['one', 'two', 'three', 'five', 'six']))
1100
    self.assertEquals(self.ls._is_owned(), True)
1101
    self.assertEquals(self.ls._names(),
1102
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1103
    self.ls.release()
1104
    self._waitThreads()
1105
    self.assertEqual(self.done.get_nowait(), 'DONE')
1106
    self.assertEqual(self.done.get_nowait(), 'DONE')
1107
    self._setUpLS()
1108

    
1109
  @_Repeat
1110
  def testEmptyLockSet(self):
1111
    # get the set-lock
1112
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1113
    # now empty it...
1114
    self.ls.remove(['one', 'two', 'three'])
1115
    # and adds/locks by another thread still wait
1116
    self._addThread(target=self._doAddSet, args=(['nine']))
1117
    self._addThread(target=self._doLockSet, args=(None, 1))
1118
    self._addThread(target=self._doLockSet, args=(None, 0))
1119
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1120
    self.ls.release()
1121
    self._waitThreads()
1122
    for _ in range(3):
1123
      self.assertEqual(self.done.get_nowait(), 'DONE')
1124
    # empty it again...
1125
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1126
    # now share it...
1127
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1128
    # other sharers can go, adds still wait
1129
    self._addThread(target=self._doLockSet, args=(None, 1))
1130
    self._waitThreads()
1131
    self.assertEqual(self.done.get_nowait(), 'DONE')
1132
    self._addThread(target=self._doAddSet, args=(['nine']))
1133
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1134
    self.ls.release()
1135
    self._waitThreads()
1136
    self.assertEqual(self.done.get_nowait(), 'DONE')
1137
    self._setUpLS()
1138

    
1139

    
1140
class TestGanetiLockManager(_ThreadedTestCase):
1141

    
1142
  def setUp(self):
1143
    _ThreadedTestCase.setUp(self)
1144
    self.nodes=['n1', 'n2']
1145
    self.instances=['i1', 'i2', 'i3']
1146
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1147
                                        instances=self.instances)
1148

    
1149
  def tearDown(self):
1150
    # Don't try this at home...
1151
    locking.GanetiLockManager._instance = None
1152

    
1153
  def testLockingConstants(self):
1154
    # The locking library internally cheats by assuming its constants have some
1155
    # relationships with each other. Check those hold true.
1156
    # This relationship is also used in the Processor to recursively acquire
1157
    # the right locks. Again, please don't break it.
1158
    for i in range(len(locking.LEVELS)):
1159
      self.assertEqual(i, locking.LEVELS[i])
1160

    
1161
  def testDoubleGLFails(self):
1162
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1163

    
1164
  def testLockNames(self):
1165
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1166
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1167
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1168
                     set(self.instances))
1169

    
1170
  def testInitAndResources(self):
1171
    locking.GanetiLockManager._instance = None
1172
    self.GL = locking.GanetiLockManager()
1173
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1174
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1175
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1176

    
1177
    locking.GanetiLockManager._instance = None
1178
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1179
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1180
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1181
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1182

    
1183
    locking.GanetiLockManager._instance = None
1184
    self.GL = locking.GanetiLockManager(instances=self.instances)
1185
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1186
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1187
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1188
                     set(self.instances))
1189

    
1190
  def testAcquireRelease(self):
1191
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1192
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1193
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1194
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1195
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1196
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1197
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1198
    self.GL.release(locking.LEVEL_NODE)
1199
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1200
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1201
    self.GL.release(locking.LEVEL_INSTANCE)
1202
    self.assertRaises(errors.LockError, self.GL.acquire,
1203
                      locking.LEVEL_INSTANCE, ['i5'])
1204
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1205
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1206

    
1207
  def testAcquireWholeSets(self):
1208
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1209
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1210
                      set(self.instances))
1211
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1212
                      set(self.instances))
1213
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1214
                      set(self.nodes))
1215
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1216
                      set(self.nodes))
1217
    self.GL.release(locking.LEVEL_NODE)
1218
    self.GL.release(locking.LEVEL_INSTANCE)
1219
    self.GL.release(locking.LEVEL_CLUSTER)
1220

    
1221
  def testAcquireWholeAndPartial(self):
1222
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1223
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1224
                      set(self.instances))
1225
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1226
                      set(self.instances))
1227
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1228
                      set(['n2']))
1229
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1230
                      set(['n2']))
1231
    self.GL.release(locking.LEVEL_NODE)
1232
    self.GL.release(locking.LEVEL_INSTANCE)
1233
    self.GL.release(locking.LEVEL_CLUSTER)
1234

    
1235
  def testBGLDependency(self):
1236
    self.assertRaises(AssertionError, self.GL.acquire,
1237
                      locking.LEVEL_NODE, ['n1', 'n2'])
1238
    self.assertRaises(AssertionError, self.GL.acquire,
1239
                      locking.LEVEL_INSTANCE, ['i3'])
1240
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1241
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1242
    self.assertRaises(AssertionError, self.GL.release,
1243
                      locking.LEVEL_CLUSTER, ['BGL'])
1244
    self.assertRaises(AssertionError, self.GL.release,
1245
                      locking.LEVEL_CLUSTER)
1246
    self.GL.release(locking.LEVEL_NODE)
1247
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1248
    self.assertRaises(AssertionError, self.GL.release,
1249
                      locking.LEVEL_CLUSTER, ['BGL'])
1250
    self.assertRaises(AssertionError, self.GL.release,
1251
                      locking.LEVEL_CLUSTER)
1252
    self.GL.release(locking.LEVEL_INSTANCE)
1253

    
1254
  def testWrongOrder(self):
1255
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1256
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1257
    self.assertRaises(AssertionError, self.GL.acquire,
1258
                      locking.LEVEL_NODE, ['n1'])
1259
    self.assertRaises(AssertionError, self.GL.acquire,
1260
                      locking.LEVEL_INSTANCE, ['i2'])
1261

    
1262
  # Helper function to run as a thread that shared the BGL and then acquires
1263
  # some locks at another level.
1264
  def _doLock(self, level, names, shared):
1265
    try:
1266
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1267
      self.GL.acquire(level, names, shared=shared)
1268
      self.done.put('DONE')
1269
      self.GL.release(level)
1270
      self.GL.release(locking.LEVEL_CLUSTER)
1271
    except errors.LockError:
1272
      self.done.put('ERR')
1273

    
1274
  @_Repeat
1275
  def testConcurrency(self):
1276
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1277
    self._addThread(target=self._doLock,
1278
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1279
    self._waitThreads()
1280
    self.assertEqual(self.done.get_nowait(), 'DONE')
1281
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1282
    self._addThread(target=self._doLock,
1283
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1284
    self._waitThreads()
1285
    self.assertEqual(self.done.get_nowait(), 'DONE')
1286
    self._addThread(target=self._doLock,
1287
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1288
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1289
    self.GL.release(locking.LEVEL_INSTANCE)
1290
    self._waitThreads()
1291
    self.assertEqual(self.done.get_nowait(), 'DONE')
1292
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1293
    self._addThread(target=self._doLock,
1294
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1295
    self._waitThreads()
1296
    self.assertEqual(self.done.get_nowait(), 'DONE')
1297
    self._addThread(target=self._doLock,
1298
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1299
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1300
    self.GL.release(locking.LEVEL_INSTANCE)
1301
    self._waitThreads()
1302
    self.assertEqual(self.done.get(True, 1), 'DONE')
1303
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1304

    
1305

    
1306
if __name__ == '__main__':
1307
  unittest.main()
1308
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1309
  #unittest.TextTestRunner(verbosity=2).run(suite)