Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ a66bd91b

History | View | Annotate | Download (43.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
class _ThreadedTestCase(unittest.TestCase):
52
  """Test class that supports adding/waiting on threads"""
53
  def setUp(self):
54
    unittest.TestCase.setUp(self)
55
    self.threads = []
56

    
57
  def _addThread(self, *args, **kwargs):
58
    """Create and remember a new thread"""
59
    t = threading.Thread(*args, **kwargs)
60
    self.threads.append(t)
61
    t.start()
62
    return t
63

    
64
  def _waitThreads(self):
65
    """Wait for all our threads to finish"""
66
    for t in self.threads:
67
      t.join(60)
68
      self.failIf(t.isAlive())
69
    self.threads = []
70

    
71

    
72
class TestPipeCondition(_ThreadedTestCase):
73
  """_PipeCondition tests"""
74

    
75
  def setUp(self):
76
    _ThreadedTestCase.setUp(self)
77
    self.lock = threading.Lock()
78
    self.cond = locking._PipeCondition(self.lock)
79
    self.done = Queue.Queue(0)
80

    
81
  def testAcquireRelease(self):
82
    self.assert_(not self.cond._is_owned())
83
    self.assertRaises(RuntimeError, self.cond.wait)
84
    self.assertRaises(RuntimeError, self.cond.notifyAll)
85

    
86
    self.cond.acquire()
87
    self.assert_(self.cond._is_owned())
88
    self.cond.notifyAll()
89
    self.assert_(self.cond._is_owned())
90
    self.cond.release()
91

    
92
    self.assert_(not self.cond._is_owned())
93
    self.assertRaises(RuntimeError, self.cond.wait)
94
    self.assertRaises(RuntimeError, self.cond.notifyAll)
95

    
96
  def testNotification(self):
97
    def _NotifyAll():
98
      self.cond.acquire()
99
      self.cond.notifyAll()
100
      self.cond.release()
101

    
102
    self.cond.acquire()
103
    self._addThread(target=_NotifyAll)
104
    self.cond.wait()
105
    self.assert_(self.cond._is_owned())
106
    self.cond.release()
107
    self.assert_(not self.cond._is_owned())
108

    
109
  def _TestWait(self, fn):
110
    self._addThread(target=fn)
111
    self._addThread(target=fn)
112
    self._addThread(target=fn)
113

    
114
    # Wait for threads to be waiting
115
    self.assertEqual(self.done.get(True, 1), "A")
116
    self.assertEqual(self.done.get(True, 1), "A")
117
    self.assertEqual(self.done.get(True, 1), "A")
118

    
119
    self.assertRaises(Queue.Empty, self.done.get_nowait)
120

    
121
    self.cond.acquire()
122
    self.assertEqual(self.cond._nwaiters, 3)
123
    # This new thread can"t acquire the lock, and thus call wait, before we
124
    # release it
125
    self._addThread(target=fn)
126
    self.cond.notifyAll()
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.release()
129

    
130
    # We should now get 3 W and 1 A (for the new thread) in whatever order
131
    w = 0
132
    a = 0
133
    for i in range(4):
134
      got = self.done.get(True, 1)
135
      if got == "W":
136
        w += 1
137
      elif got == "A":
138
        a += 1
139
      else:
140
        self.fail("Got %s on the done queue" % got)
141

    
142
    self.assertEqual(w, 3)
143
    self.assertEqual(a, 1)
144

    
145
    self.cond.acquire()
146
    self.cond.notifyAll()
147
    self.cond.release()
148
    self._waitThreads()
149
    self.assertEqual(self.done.get_nowait(), "W")
150
    self.assertRaises(Queue.Empty, self.done.get_nowait)
151

    
152
  def testBlockingWait(self):
153
    def _BlockingWait():
154
      self.cond.acquire()
155
      self.done.put("A")
156
      self.cond.wait()
157
      self.cond.release()
158
      self.done.put("W")
159

    
160
    self._TestWait(_BlockingWait)
161

    
162
  def testLongTimeoutWait(self):
163
    def _Helper():
164
      self.cond.acquire()
165
      self.done.put("A")
166
      self.cond.wait(15.0)
167
      self.cond.release()
168
      self.done.put("W")
169

    
170
    self._TestWait(_Helper)
171

    
172
  def _TimeoutWait(self, timeout, check):
173
    self.cond.acquire()
174
    self.cond.wait(timeout)
175
    self.cond.release()
176
    self.done.put(check)
177

    
178
  def testShortTimeoutWait(self):
179
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
180
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
181
    self._waitThreads()
182
    self.assertEqual(self.done.get_nowait(), "T1")
183
    self.assertEqual(self.done.get_nowait(), "T1")
184
    self.assertRaises(Queue.Empty, self.done.get_nowait)
185

    
186
  def testZeroTimeoutWait(self):
187
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
188
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
189
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
190
    self._waitThreads()
191
    self.assertEqual(self.done.get_nowait(), "T0")
192
    self.assertEqual(self.done.get_nowait(), "T0")
193
    self.assertEqual(self.done.get_nowait(), "T0")
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195

    
196

    
197
class TestSingleActionPipeCondition(unittest.TestCase):
198
  """_SingleActionPipeCondition tests"""
199

    
200
  def setUp(self):
201
    self.cond = locking._SingleActionPipeCondition()
202

    
203
  def testInitialization(self):
204
    self.assert_(self.cond._read_fd is not None)
205
    self.assert_(self.cond._write_fd is not None)
206
    self.assert_(self.cond._poller is not None)
207
    self.assertEqual(self.cond._nwaiters, 0)
208

    
209
  def testUsageCount(self):
210
    self.cond.StartWaiting()
211
    self.assert_(self.cond._read_fd is not None)
212
    self.assert_(self.cond._write_fd is not None)
213
    self.assert_(self.cond._poller is not None)
214
    self.assertEqual(self.cond._nwaiters, 1)
215

    
216
    # use again
217
    self.cond.StartWaiting()
218
    self.assertEqual(self.cond._nwaiters, 2)
219

    
220
    # there is more than one user
221
    self.assert_(not self.cond.DoneWaiting())
222
    self.assert_(self.cond._read_fd is not None)
223
    self.assert_(self.cond._write_fd is not None)
224
    self.assert_(self.cond._poller is not None)
225
    self.assertEqual(self.cond._nwaiters, 1)
226

    
227
    self.assert_(self.cond.DoneWaiting())
228
    self.assertEqual(self.cond._nwaiters, 0)
229
    self.assert_(self.cond._read_fd is None)
230
    self.assert_(self.cond._write_fd is None)
231
    self.assert_(self.cond._poller is None)
232

    
233
  def testNotify(self):
234
    wait1 = self.cond.StartWaiting()
235
    wait2 = self.cond.StartWaiting()
236

    
237
    self.assert_(self.cond._read_fd is not None)
238
    self.assert_(self.cond._write_fd is not None)
239
    self.assert_(self.cond._poller is not None)
240

    
241
    self.cond.notifyAll()
242

    
243
    self.assert_(self.cond._read_fd is not None)
244
    self.assert_(self.cond._write_fd is None)
245
    self.assert_(self.cond._poller is not None)
246

    
247
    self.assert_(not self.cond.DoneWaiting())
248

    
249
    self.assert_(self.cond._read_fd is not None)
250
    self.assert_(self.cond._write_fd is None)
251
    self.assert_(self.cond._poller is not None)
252

    
253
    self.assert_(self.cond.DoneWaiting())
254

    
255
    self.assert_(self.cond._read_fd is None)
256
    self.assert_(self.cond._write_fd is None)
257
    self.assert_(self.cond._poller is None)
258

    
259
  def testReusage(self):
260
    self.cond.StartWaiting()
261
    self.assert_(self.cond._read_fd is not None)
262
    self.assert_(self.cond._write_fd is not None)
263
    self.assert_(self.cond._poller is not None)
264

    
265
    self.assert_(self.cond.DoneWaiting())
266

    
267
    self.assertRaises(RuntimeError, self.cond.StartWaiting)
268
    self.assert_(self.cond._read_fd is None)
269
    self.assert_(self.cond._write_fd is None)
270
    self.assert_(self.cond._poller is None)
271

    
272
  def testNotifyTwice(self):
273
    self.cond.notifyAll()
274
    self.assertRaises(RuntimeError, self.cond.notifyAll)
275

    
276

    
277
class TestSharedLock(_ThreadedTestCase):
278
  """SharedLock tests"""
279

    
280
  def setUp(self):
281
    _ThreadedTestCase.setUp(self)
282
    self.sl = locking.SharedLock()
283
    # helper threads use the 'done' queue to tell the master they finished.
284
    self.done = Queue.Queue(0)
285

    
286
  def testSequenceAndOwnership(self):
287
    self.assert_(not self.sl._is_owned())
288
    self.sl.acquire(shared=1)
289
    self.assert_(self.sl._is_owned())
290
    self.assert_(self.sl._is_owned(shared=1))
291
    self.assert_(not self.sl._is_owned(shared=0))
292
    self.sl.release()
293
    self.assert_(not self.sl._is_owned())
294
    self.sl.acquire()
295
    self.assert_(self.sl._is_owned())
296
    self.assert_(not self.sl._is_owned(shared=1))
297
    self.assert_(self.sl._is_owned(shared=0))
298
    self.sl.release()
299
    self.assert_(not self.sl._is_owned())
300
    self.sl.acquire(shared=1)
301
    self.assert_(self.sl._is_owned())
302
    self.assert_(self.sl._is_owned(shared=1))
303
    self.assert_(not self.sl._is_owned(shared=0))
304
    self.sl.release()
305
    self.assert_(not self.sl._is_owned())
306

    
307
  def testBooleanValue(self):
308
    # semaphores are supposed to return a true value on a successful acquire
309
    self.assert_(self.sl.acquire(shared=1))
310
    self.sl.release()
311
    self.assert_(self.sl.acquire())
312
    self.sl.release()
313

    
314
  def testDoubleLockingStoE(self):
315
    self.sl.acquire(shared=1)
316
    self.assertRaises(AssertionError, self.sl.acquire)
317

    
318
  def testDoubleLockingEtoS(self):
319
    self.sl.acquire()
320
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
321

    
322
  def testDoubleLockingStoS(self):
323
    self.sl.acquire(shared=1)
324
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
325

    
326
  def testDoubleLockingEtoE(self):
327
    self.sl.acquire()
328
    self.assertRaises(AssertionError, self.sl.acquire)
329

    
330
  # helper functions: called in a separate thread they acquire the lock, send
331
  # their identifier on the done queue, then release it.
332
  def _doItSharer(self):
333
    try:
334
      self.sl.acquire(shared=1)
335
      self.done.put('SHR')
336
      self.sl.release()
337
    except errors.LockError:
338
      self.done.put('ERR')
339

    
340
  def _doItExclusive(self):
341
    try:
342
      self.sl.acquire()
343
      self.done.put('EXC')
344
      self.sl.release()
345
    except errors.LockError:
346
      self.done.put('ERR')
347

    
348
  def _doItDelete(self):
349
    try:
350
      self.sl.delete()
351
      self.done.put('DEL')
352
    except errors.LockError:
353
      self.done.put('ERR')
354

    
355
  def testSharersCanCoexist(self):
356
    self.sl.acquire(shared=1)
357
    threading.Thread(target=self._doItSharer).start()
358
    self.assert_(self.done.get(True, 1))
359
    self.sl.release()
360

    
361
  @_Repeat
362
  def testExclusiveBlocksExclusive(self):
363
    self.sl.acquire()
364
    self._addThread(target=self._doItExclusive)
365
    self.assertRaises(Queue.Empty, self.done.get_nowait)
366
    self.sl.release()
367
    self._waitThreads()
368
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
369

    
370
  @_Repeat
371
  def testExclusiveBlocksDelete(self):
372
    self.sl.acquire()
373
    self._addThread(target=self._doItDelete)
374
    self.assertRaises(Queue.Empty, self.done.get_nowait)
375
    self.sl.release()
376
    self._waitThreads()
377
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
378
    self.sl = locking.SharedLock()
379

    
380
  @_Repeat
381
  def testExclusiveBlocksSharer(self):
382
    self.sl.acquire()
383
    self._addThread(target=self._doItSharer)
384
    self.assertRaises(Queue.Empty, self.done.get_nowait)
385
    self.sl.release()
386
    self._waitThreads()
387
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
388

    
389
  @_Repeat
390
  def testSharerBlocksExclusive(self):
391
    self.sl.acquire(shared=1)
392
    self._addThread(target=self._doItExclusive)
393
    self.assertRaises(Queue.Empty, self.done.get_nowait)
394
    self.sl.release()
395
    self._waitThreads()
396
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
397

    
398
  @_Repeat
399
  def testSharerBlocksDelete(self):
400
    self.sl.acquire(shared=1)
401
    self._addThread(target=self._doItDelete)
402
    self.assertRaises(Queue.Empty, self.done.get_nowait)
403
    self.sl.release()
404
    self._waitThreads()
405
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
406
    self.sl = locking.SharedLock()
407

    
408
  @_Repeat
409
  def testWaitingExclusiveBlocksSharer(self):
410
    """SKIPPED testWaitingExclusiveBlockSharer"""
411
    return
412

    
413
    self.sl.acquire(shared=1)
414
    # the lock is acquired in shared mode...
415
    self._addThread(target=self._doItExclusive)
416
    # ...but now an exclusive is waiting...
417
    self._addThread(target=self._doItSharer)
418
    # ...so the sharer should be blocked as well
419
    self.assertRaises(Queue.Empty, self.done.get_nowait)
420
    self.sl.release()
421
    self._waitThreads()
422
    # The exclusive passed before
423
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
424
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
425

    
426
  @_Repeat
427
  def testWaitingSharerBlocksExclusive(self):
428
    """SKIPPED testWaitingSharerBlocksExclusive"""
429
    return
430

    
431
    self.sl.acquire()
432
    # the lock is acquired in exclusive mode...
433
    self._addThread(target=self._doItSharer)
434
    # ...but now a sharer is waiting...
435
    self._addThread(target=self._doItExclusive)
436
    # ...the exclusive is waiting too...
437
    self.assertRaises(Queue.Empty, self.done.get_nowait)
438
    self.sl.release()
439
    self._waitThreads()
440
    # The sharer passed before
441
    self.assertEqual(self.done.get_nowait(), 'SHR')
442
    self.assertEqual(self.done.get_nowait(), 'EXC')
443

    
444
  def testDelete(self):
445
    self.sl.delete()
446
    self.assertRaises(errors.LockError, self.sl.acquire)
447
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
448
    self.assertRaises(errors.LockError, self.sl.delete)
449

    
450
  def testDeleteTimeout(self):
451
    self.sl.delete(timeout=60)
452

    
453
  def testNoDeleteIfSharer(self):
454
    self.sl.acquire(shared=1)
455
    self.assertRaises(AssertionError, self.sl.delete)
456

    
457
  @_Repeat
458
  def testDeletePendingSharersExclusiveDelete(self):
459
    self.sl.acquire()
460
    self._addThread(target=self._doItSharer)
461
    self._addThread(target=self._doItSharer)
462
    self._addThread(target=self._doItExclusive)
463
    self._addThread(target=self._doItDelete)
464
    self.sl.delete()
465
    self._waitThreads()
466
    # The threads who were pending return ERR
467
    for _ in range(4):
468
      self.assertEqual(self.done.get_nowait(), 'ERR')
469
    self.sl = locking.SharedLock()
470

    
471
  @_Repeat
472
  def testDeletePendingDeleteExclusiveSharers(self):
473
    self.sl.acquire()
474
    self._addThread(target=self._doItDelete)
475
    self._addThread(target=self._doItExclusive)
476
    self._addThread(target=self._doItSharer)
477
    self._addThread(target=self._doItSharer)
478
    self.sl.delete()
479
    self._waitThreads()
480
    # The two threads who were pending return both ERR
481
    self.assertEqual(self.done.get_nowait(), 'ERR')
482
    self.assertEqual(self.done.get_nowait(), 'ERR')
483
    self.assertEqual(self.done.get_nowait(), 'ERR')
484
    self.assertEqual(self.done.get_nowait(), 'ERR')
485
    self.sl = locking.SharedLock()
486

    
487
  @_Repeat
488
  def testExclusiveAcquireTimeout(self):
489
    def _LockExclusive(wait):
490
      self.sl.acquire(shared=0)
491
      self.done.put("A: start sleep")
492
      time.sleep(wait)
493
      self.done.put("A: end sleep")
494
      self.sl.release()
495

    
496
    for shared in [0, 1]:
497
      # Start thread to hold lock for 20 ms
498
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
499

    
500
      # Wait for sleep to begin
501
      self.assertEqual(self.done.get(), "A: start sleep")
502

    
503
      # Wait up to 100 ms to get lock
504
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
505
      self.done.put("got 2nd")
506
      self.sl.release()
507

    
508
      self._waitThreads()
509

    
510
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
511
      self.assertEqual(self.done.get_nowait(), "got 2nd")
512
      self.assertRaises(Queue.Empty, self.done.get_nowait)
513

    
514
  @_Repeat
515
  def testAcquireExpiringTimeout(self):
516
    def _AcquireWithTimeout(shared, timeout):
517
      if not self.sl.acquire(shared=shared, timeout=timeout):
518
        self.done.put("timeout")
519

    
520
    for shared in [0, 1]:
521
      # Lock exclusively
522
      self.sl.acquire()
523

    
524
      # Start shared acquires with timeout between 0 and 20 ms
525
      for i in xrange(11):
526
        self._addThread(target=_AcquireWithTimeout,
527
                        args=(shared, i * 2.0 / 1000.0))
528

    
529
      # Wait for threads to finish (makes sure the acquire timeout expires
530
      # before releasing the lock)
531
      self._waitThreads()
532

    
533
      # Release lock
534
      self.sl.release()
535

    
536
      for _ in xrange(11):
537
        self.assertEqual(self.done.get_nowait(), "timeout")
538

    
539
      self.assertRaises(Queue.Empty, self.done.get_nowait)
540

    
541
  @_Repeat
542
  def testSharedSkipExclusiveAcquires(self):
543
    # Tests whether shared acquires jump in front of exclusive acquires in the
544
    # queue.
545

    
546
    # Get exclusive lock while we fill the queue
547
    self.sl.acquire()
548

    
549
    def _Acquire(shared, name):
550
      if not self.sl.acquire(shared=shared):
551
        return
552

    
553
      self.done.put(name)
554
      self.sl.release()
555

    
556
    # Start shared acquires
557
    for _ in xrange(5):
558
      self._addThread(target=_Acquire, args=(1, "shared A"))
559

    
560
    # Start exclusive acquires
561
    for _ in xrange(3):
562
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
563

    
564
    # More shared acquires
565
    for _ in xrange(5):
566
      self._addThread(target=_Acquire, args=(1, "shared C"))
567

    
568
    # More exclusive acquires
569
    for _ in xrange(3):
570
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
571

    
572
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
573
    # together. There's no way to wait for SharedLock.acquire to start
574
    # its work. Hence the timeout of 2 seconds.
575
    pending = 0
576
    end_time = time.time() + 2.0
577
    while time.time() < end_time:
578
      pending = self.sl._count_pending()
579
      self.assert_(pending >= 0 and pending <= 7)
580
      if pending == 7:
581
        break
582
      time.sleep(0.05)
583
    self.assertEqual(pending, 7)
584

    
585
    # Release exclusive lock and wait
586
    self.sl.release()
587

    
588
    self._waitThreads()
589

    
590
    # Check sequence
591
    shr_a = 0
592
    shr_c = 0
593
    for _ in xrange(10):
594
      # Shared locks aren't guaranteed to be notified in order, but they'll be
595
      # first
596
      tmp = self.done.get_nowait()
597
      if tmp == "shared A":
598
        shr_a += 1
599
      elif tmp == "shared C":
600
        shr_c += 1
601
    self.assertEqual(shr_a, 5)
602
    self.assertEqual(shr_c, 5)
603

    
604
    for _ in xrange(3):
605
      self.assertEqual(self.done.get_nowait(), "exclusive B")
606

    
607
    for _ in xrange(3):
608
      self.assertEqual(self.done.get_nowait(), "exclusive D")
609

    
610
    self.assertRaises(Queue.Empty, self.done.get_nowait)
611

    
612
  @_Repeat
613
  def testMixedAcquireTimeout(self):
614
    sync = threading.Condition()
615

    
616
    def _AcquireShared(ev):
617
      if not self.sl.acquire(shared=1, timeout=None):
618
        return
619

    
620
      self.done.put("shared")
621

    
622
      # Notify main thread
623
      ev.set()
624

    
625
      # Wait for notification
626
      sync.acquire()
627
      try:
628
        sync.wait()
629
      finally:
630
        sync.release()
631

    
632
      # Release lock
633
      self.sl.release()
634

    
635
    acquires = []
636
    for _ in xrange(3):
637
      ev = threading.Event()
638
      self._addThread(target=_AcquireShared, args=(ev, ))
639
      acquires.append(ev)
640

    
641
    # Wait for all acquires to finish
642
    for i in acquires:
643
      i.wait()
644

    
645
    self.assertEqual(self.sl._count_pending(), 0)
646

    
647
    # Try to get exclusive lock
648
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
649

    
650
    # Acquire exclusive without timeout
651
    exclsync = threading.Condition()
652
    exclev = threading.Event()
653

    
654
    def _AcquireExclusive():
655
      if not self.sl.acquire(shared=0):
656
        return
657

    
658
      self.done.put("exclusive")
659

    
660
      # Notify main thread
661
      exclev.set()
662

    
663
      exclsync.acquire()
664
      try:
665
        exclsync.wait()
666
      finally:
667
        exclsync.release()
668

    
669
      self.sl.release()
670

    
671
    self._addThread(target=_AcquireExclusive)
672

    
673
    # Try to get exclusive lock
674
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
675

    
676
    # Make all shared holders release their locks
677
    sync.acquire()
678
    try:
679
      sync.notifyAll()
680
    finally:
681
      sync.release()
682

    
683
    # Wait for exclusive acquire to succeed
684
    exclev.wait()
685

    
686
    self.assertEqual(self.sl._count_pending(), 0)
687

    
688
    # Try to get exclusive lock
689
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
690

    
691
    def _AcquireSharedSimple():
692
      if self.sl.acquire(shared=1, timeout=None):
693
        self.done.put("shared2")
694
        self.sl.release()
695

    
696
    for _ in xrange(10):
697
      self._addThread(target=_AcquireSharedSimple)
698

    
699
    # Tell exclusive lock to release
700
    exclsync.acquire()
701
    try:
702
      exclsync.notifyAll()
703
    finally:
704
      exclsync.release()
705

    
706
    # Wait for everything to finish
707
    self._waitThreads()
708

    
709
    self.assertEqual(self.sl._count_pending(), 0)
710

    
711
    # Check sequence
712
    for _ in xrange(3):
713
      self.assertEqual(self.done.get_nowait(), "shared")
714

    
715
    self.assertEqual(self.done.get_nowait(), "exclusive")
716

    
717
    for _ in xrange(10):
718
      self.assertEqual(self.done.get_nowait(), "shared2")
719

    
720
    self.assertRaises(Queue.Empty, self.done.get_nowait)
721

    
722

    
723
class TestSSynchronizedDecorator(_ThreadedTestCase):
724
  """Shared Lock Synchronized decorator test"""
725

    
726
  def setUp(self):
727
    _ThreadedTestCase.setUp(self)
728
    # helper threads use the 'done' queue to tell the master they finished.
729
    self.done = Queue.Queue(0)
730

    
731
  @locking.ssynchronized(_decoratorlock)
732
  def _doItExclusive(self):
733
    self.assert_(_decoratorlock._is_owned())
734
    self.done.put('EXC')
735

    
736
  @locking.ssynchronized(_decoratorlock, shared=1)
737
  def _doItSharer(self):
738
    self.assert_(_decoratorlock._is_owned(shared=1))
739
    self.done.put('SHR')
740

    
741
  def testDecoratedFunctions(self):
742
    self._doItExclusive()
743
    self.assert_(not _decoratorlock._is_owned())
744
    self._doItSharer()
745
    self.assert_(not _decoratorlock._is_owned())
746

    
747
  def testSharersCanCoexist(self):
748
    _decoratorlock.acquire(shared=1)
749
    threading.Thread(target=self._doItSharer).start()
750
    self.assert_(self.done.get(True, 1))
751
    _decoratorlock.release()
752

    
753
  @_Repeat
754
  def testExclusiveBlocksExclusive(self):
755
    _decoratorlock.acquire()
756
    self._addThread(target=self._doItExclusive)
757
    # give it a bit of time to check that it's not actually doing anything
758
    self.assertRaises(Queue.Empty, self.done.get_nowait)
759
    _decoratorlock.release()
760
    self._waitThreads()
761
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
762

    
763
  @_Repeat
764
  def testExclusiveBlocksSharer(self):
765
    _decoratorlock.acquire()
766
    self._addThread(target=self._doItSharer)
767
    self.assertRaises(Queue.Empty, self.done.get_nowait)
768
    _decoratorlock.release()
769
    self._waitThreads()
770
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
771

    
772
  @_Repeat
773
  def testSharerBlocksExclusive(self):
774
    _decoratorlock.acquire(shared=1)
775
    self._addThread(target=self._doItExclusive)
776
    self.assertRaises(Queue.Empty, self.done.get_nowait)
777
    _decoratorlock.release()
778
    self._waitThreads()
779
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
780

    
781

    
782
class TestLockSet(_ThreadedTestCase):
783
  """LockSet tests"""
784

    
785
  def setUp(self):
786
    _ThreadedTestCase.setUp(self)
787
    self._setUpLS()
788
    # helper threads use the 'done' queue to tell the master they finished.
789
    self.done = Queue.Queue(0)
790

    
791
  def _setUpLS(self):
792
    """Helper to (re)initialize the lock set"""
793
    self.resources = ['one', 'two', 'three']
794
    self.ls = locking.LockSet(members=self.resources)
795

    
796
  def testResources(self):
797
    self.assertEquals(self.ls._names(), set(self.resources))
798
    newls = locking.LockSet()
799
    self.assertEquals(newls._names(), set())
800

    
801
  def testAcquireRelease(self):
802
    self.assert_(self.ls.acquire('one'))
803
    self.assertEquals(self.ls._list_owned(), set(['one']))
804
    self.ls.release()
805
    self.assertEquals(self.ls._list_owned(), set())
806
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
807
    self.assertEquals(self.ls._list_owned(), set(['one']))
808
    self.ls.release()
809
    self.assertEquals(self.ls._list_owned(), set())
810
    self.ls.acquire(['one', 'two', 'three'])
811
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
812
    self.ls.release('one')
813
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
814
    self.ls.release(['three'])
815
    self.assertEquals(self.ls._list_owned(), set(['two']))
816
    self.ls.release()
817
    self.assertEquals(self.ls._list_owned(), set())
818
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
819
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
820
    self.ls.release()
821
    self.assertEquals(self.ls._list_owned(), set())
822

    
823
  def testNoDoubleAcquire(self):
824
    self.ls.acquire('one')
825
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
826
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
827
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
828
    self.ls.release()
829
    self.ls.acquire(['one', 'three'])
830
    self.ls.release('one')
831
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
832
    self.ls.release('three')
833

    
834
  def testNoWrongRelease(self):
835
    self.assertRaises(AssertionError, self.ls.release)
836
    self.ls.acquire('one')
837
    self.assertRaises(AssertionError, self.ls.release, 'two')
838

    
839
  def testAddRemove(self):
840
    self.ls.add('four')
841
    self.assertEquals(self.ls._list_owned(), set())
842
    self.assert_('four' in self.ls._names())
843
    self.ls.add(['five', 'six', 'seven'], acquired=1)
844
    self.assert_('five' in self.ls._names())
845
    self.assert_('six' in self.ls._names())
846
    self.assert_('seven' in self.ls._names())
847
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
848
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
849
    self.assert_('five' not in self.ls._names())
850
    self.assert_('six' not in self.ls._names())
851
    self.assertEquals(self.ls._list_owned(), set(['seven']))
852
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
853
    self.ls.remove('seven')
854
    self.assert_('seven' not in self.ls._names())
855
    self.assertEquals(self.ls._list_owned(), set([]))
856
    self.ls.acquire(None, shared=1)
857
    self.assertRaises(AssertionError, self.ls.add, 'eight')
858
    self.ls.release()
859
    self.ls.acquire(None)
860
    self.ls.add('eight', acquired=1)
861
    self.assert_('eight' in self.ls._names())
862
    self.assert_('eight' in self.ls._list_owned())
863
    self.ls.add('nine')
864
    self.assert_('nine' in self.ls._names())
865
    self.assert_('nine' not in self.ls._list_owned())
866
    self.ls.release()
867
    self.ls.remove(['two'])
868
    self.assert_('two' not in self.ls._names())
869
    self.ls.acquire('three')
870
    self.assertEquals(self.ls.remove(['three']), ['three'])
871
    self.assert_('three' not in self.ls._names())
872
    self.assertEquals(self.ls.remove('three'), [])
873
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
874
    self.assert_('one' not in self.ls._names())
875

    
876
  def testRemoveNonBlocking(self):
877
    self.ls.acquire('one')
878
    self.assertEquals(self.ls.remove('one'), ['one'])
879
    self.ls.acquire(['two', 'three'])
880
    self.assertEquals(self.ls.remove(['two', 'three']),
881
                      ['two', 'three'])
882

    
883
  def testNoDoubleAdd(self):
884
    self.assertRaises(errors.LockError, self.ls.add, 'two')
885
    self.ls.add('four')
886
    self.assertRaises(errors.LockError, self.ls.add, 'four')
887

    
888
  def testNoWrongRemoves(self):
889
    self.ls.acquire(['one', 'three'], shared=1)
890
    # Cannot remove 'two' while holding something which is not a superset
891
    self.assertRaises(AssertionError, self.ls.remove, 'two')
892
    # Cannot remove 'three' as we are sharing it
893
    self.assertRaises(AssertionError, self.ls.remove, 'three')
894

    
895
  def testAcquireSetLock(self):
896
    # acquire the set-lock exclusively
897
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
898
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
899
    self.assertEquals(self.ls._is_owned(), True)
900
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
901
    # I can still add/remove elements...
902
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
903
    self.assert_(self.ls.add('six'))
904
    self.ls.release()
905
    # share the set-lock
906
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
907
    # adding new elements is not possible
908
    self.assertRaises(AssertionError, self.ls.add, 'five')
909
    self.ls.release()
910

    
911
  def testAcquireWithRepetitions(self):
912
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
913
                      set(['two', 'two', 'three']))
914
    self.ls.release(['two', 'two'])
915
    self.assertEquals(self.ls._list_owned(), set(['three']))
916

    
917
  def testEmptyAcquire(self):
918
    # Acquire an empty list of locks...
919
    self.assertEquals(self.ls.acquire([]), set())
920
    self.assertEquals(self.ls._list_owned(), set())
921
    # New locks can still be addded
922
    self.assert_(self.ls.add('six'))
923
    # "re-acquiring" is not an issue, since we had really acquired nothing
924
    self.assertEquals(self.ls.acquire([], shared=1), set())
925
    self.assertEquals(self.ls._list_owned(), set())
926
    # We haven't really acquired anything, so we cannot release
927
    self.assertRaises(AssertionError, self.ls.release)
928

    
929
  def _doLockSet(self, names, shared):
930
    try:
931
      self.ls.acquire(names, shared=shared)
932
      self.done.put('DONE')
933
      self.ls.release()
934
    except errors.LockError:
935
      self.done.put('ERR')
936

    
937
  def _doAddSet(self, names):
938
    try:
939
      self.ls.add(names, acquired=1)
940
      self.done.put('DONE')
941
      self.ls.release()
942
    except errors.LockError:
943
      self.done.put('ERR')
944

    
945
  def _doRemoveSet(self, names):
946
    self.done.put(self.ls.remove(names))
947

    
948
  @_Repeat
949
  def testConcurrentSharedAcquire(self):
950
    self.ls.acquire(['one', 'two'], shared=1)
951
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
952
    self._waitThreads()
953
    self.assertEqual(self.done.get_nowait(), 'DONE')
954
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
955
    self._waitThreads()
956
    self.assertEqual(self.done.get_nowait(), 'DONE')
957
    self._addThread(target=self._doLockSet, args=('three', 1))
958
    self._waitThreads()
959
    self.assertEqual(self.done.get_nowait(), 'DONE')
960
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
961
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
962
    self.assertRaises(Queue.Empty, self.done.get_nowait)
963
    self.ls.release()
964
    self._waitThreads()
965
    self.assertEqual(self.done.get_nowait(), 'DONE')
966
    self.assertEqual(self.done.get_nowait(), 'DONE')
967

    
968
  @_Repeat
969
  def testConcurrentExclusiveAcquire(self):
970
    self.ls.acquire(['one', 'two'])
971
    self._addThread(target=self._doLockSet, args=('three', 1))
972
    self._waitThreads()
973
    self.assertEqual(self.done.get_nowait(), 'DONE')
974
    self._addThread(target=self._doLockSet, args=('three', 0))
975
    self._waitThreads()
976
    self.assertEqual(self.done.get_nowait(), 'DONE')
977
    self.assertRaises(Queue.Empty, self.done.get_nowait)
978
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
979
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
980
    self._addThread(target=self._doLockSet, args=('one', 0))
981
    self._addThread(target=self._doLockSet, args=('one', 1))
982
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
983
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
984
    self.assertRaises(Queue.Empty, self.done.get_nowait)
985
    self.ls.release()
986
    self._waitThreads()
987
    for _ in range(6):
988
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
989

    
990
  @_Repeat
991
  def testConcurrentRemove(self):
992
    self.ls.add('four')
993
    self.ls.acquire(['one', 'two', 'four'])
994
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
995
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
996
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
997
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
998
    self.assertRaises(Queue.Empty, self.done.get_nowait)
999
    self.ls.remove('one')
1000
    self.ls.release()
1001
    self._waitThreads()
1002
    for i in range(4):
1003
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1004
    self.ls.add(['five', 'six'], acquired=1)
1005
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1006
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1007
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1008
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1009
    self.ls.remove('five')
1010
    self.ls.release()
1011
    self._waitThreads()
1012
    for i in range(4):
1013
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1014
    self.ls.acquire(['three', 'four'])
1015
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1016
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1017
    self.ls.remove('four')
1018
    self._waitThreads()
1019
    self.assertEqual(self.done.get_nowait(), ['six'])
1020
    self._addThread(target=self._doRemoveSet, args=(['two']))
1021
    self._waitThreads()
1022
    self.assertEqual(self.done.get_nowait(), ['two'])
1023
    self.ls.release()
1024
    # reset lockset
1025
    self._setUpLS()
1026

    
1027
  @_Repeat
1028
  def testConcurrentSharedSetLock(self):
1029
    # share the set-lock...
1030
    self.ls.acquire(None, shared=1)
1031
    # ...another thread can share it too
1032
    self._addThread(target=self._doLockSet, args=(None, 1))
1033
    self._waitThreads()
1034
    self.assertEqual(self.done.get_nowait(), 'DONE')
1035
    # ...or just share some elements
1036
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1037
    self._waitThreads()
1038
    self.assertEqual(self.done.get_nowait(), 'DONE')
1039
    # ...but not add new ones or remove any
1040
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1041
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1042
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1043
    # this just releases the set-lock
1044
    self.ls.release([])
1045
    t.join(60)
1046
    self.assertEqual(self.done.get_nowait(), 'DONE')
1047
    # release the lock on the actual elements so remove() can proceed too
1048
    self.ls.release()
1049
    self._waitThreads()
1050
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1051
    # reset lockset
1052
    self._setUpLS()
1053

    
1054
  @_Repeat
1055
  def testConcurrentExclusiveSetLock(self):
1056
    # acquire the set-lock...
1057
    self.ls.acquire(None, shared=0)
1058
    # ...no one can do anything else
1059
    self._addThread(target=self._doLockSet, args=(None, 1))
1060
    self._addThread(target=self._doLockSet, args=(None, 0))
1061
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1062
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1063
    self._addThread(target=self._doAddSet, args=(['nine']))
1064
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1065
    self.ls.release()
1066
    self._waitThreads()
1067
    for _ in range(5):
1068
      self.assertEqual(self.done.get(True, 1), 'DONE')
1069
    # cleanup
1070
    self._setUpLS()
1071

    
1072
  @_Repeat
1073
  def testConcurrentSetLockAdd(self):
1074
    self.ls.acquire('one')
1075
    # Another thread wants the whole SetLock
1076
    self._addThread(target=self._doLockSet, args=(None, 0))
1077
    self._addThread(target=self._doLockSet, args=(None, 1))
1078
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1079
    self.assertRaises(AssertionError, self.ls.add, 'four')
1080
    self.ls.release()
1081
    self._waitThreads()
1082
    self.assertEqual(self.done.get_nowait(), 'DONE')
1083
    self.assertEqual(self.done.get_nowait(), 'DONE')
1084
    self.ls.acquire(None)
1085
    self._addThread(target=self._doLockSet, args=(None, 0))
1086
    self._addThread(target=self._doLockSet, args=(None, 1))
1087
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1088
    self.ls.add('four')
1089
    self.ls.add('five', acquired=1)
1090
    self.ls.add('six', acquired=1, shared=1)
1091
    self.assertEquals(self.ls._list_owned(),
1092
      set(['one', 'two', 'three', 'five', 'six']))
1093
    self.assertEquals(self.ls._is_owned(), True)
1094
    self.assertEquals(self.ls._names(),
1095
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1096
    self.ls.release()
1097
    self._waitThreads()
1098
    self.assertEqual(self.done.get_nowait(), 'DONE')
1099
    self.assertEqual(self.done.get_nowait(), 'DONE')
1100
    self._setUpLS()
1101

    
1102
  @_Repeat
1103
  def testEmptyLockSet(self):
1104
    # get the set-lock
1105
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1106
    # now empty it...
1107
    self.ls.remove(['one', 'two', 'three'])
1108
    # and adds/locks by another thread still wait
1109
    self._addThread(target=self._doAddSet, args=(['nine']))
1110
    self._addThread(target=self._doLockSet, args=(None, 1))
1111
    self._addThread(target=self._doLockSet, args=(None, 0))
1112
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1113
    self.ls.release()
1114
    self._waitThreads()
1115
    for _ in range(3):
1116
      self.assertEqual(self.done.get_nowait(), 'DONE')
1117
    # empty it again...
1118
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1119
    # now share it...
1120
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1121
    # other sharers can go, adds still wait
1122
    self._addThread(target=self._doLockSet, args=(None, 1))
1123
    self._waitThreads()
1124
    self.assertEqual(self.done.get_nowait(), 'DONE')
1125
    self._addThread(target=self._doAddSet, args=(['nine']))
1126
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1127
    self.ls.release()
1128
    self._waitThreads()
1129
    self.assertEqual(self.done.get_nowait(), 'DONE')
1130
    self._setUpLS()
1131

    
1132

    
1133
class TestGanetiLockManager(_ThreadedTestCase):
1134

    
1135
  def setUp(self):
1136
    _ThreadedTestCase.setUp(self)
1137
    self.nodes=['n1', 'n2']
1138
    self.instances=['i1', 'i2', 'i3']
1139
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1140
                                        instances=self.instances)
1141
    self.done = Queue.Queue(0)
1142

    
1143
  def tearDown(self):
1144
    # Don't try this at home...
1145
    locking.GanetiLockManager._instance = None
1146

    
1147
  def testLockingConstants(self):
1148
    # The locking library internally cheats by assuming its constants have some
1149
    # relationships with each other. Check those hold true.
1150
    # This relationship is also used in the Processor to recursively acquire
1151
    # the right locks. Again, please don't break it.
1152
    for i in range(len(locking.LEVELS)):
1153
      self.assertEqual(i, locking.LEVELS[i])
1154

    
1155
  def testDoubleGLFails(self):
1156
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1157

    
1158
  def testLockNames(self):
1159
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1160
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1161
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1162
                     set(self.instances))
1163

    
1164
  def testInitAndResources(self):
1165
    locking.GanetiLockManager._instance = None
1166
    self.GL = locking.GanetiLockManager()
1167
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1168
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1169
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1170

    
1171
    locking.GanetiLockManager._instance = None
1172
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1173
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1174
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1175
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1176

    
1177
    locking.GanetiLockManager._instance = None
1178
    self.GL = locking.GanetiLockManager(instances=self.instances)
1179
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1180
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1181
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1182
                     set(self.instances))
1183

    
1184
  def testAcquireRelease(self):
1185
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1186
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1187
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1188
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1189
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1190
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1191
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1192
    self.GL.release(locking.LEVEL_NODE)
1193
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1194
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1195
    self.GL.release(locking.LEVEL_INSTANCE)
1196
    self.assertRaises(errors.LockError, self.GL.acquire,
1197
                      locking.LEVEL_INSTANCE, ['i5'])
1198
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1199
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1200

    
1201
  def testAcquireWholeSets(self):
1202
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1203
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1204
                      set(self.instances))
1205
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1206
                      set(self.instances))
1207
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1208
                      set(self.nodes))
1209
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1210
                      set(self.nodes))
1211
    self.GL.release(locking.LEVEL_NODE)
1212
    self.GL.release(locking.LEVEL_INSTANCE)
1213
    self.GL.release(locking.LEVEL_CLUSTER)
1214

    
1215
  def testAcquireWholeAndPartial(self):
1216
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1217
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1218
                      set(self.instances))
1219
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1220
                      set(self.instances))
1221
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1222
                      set(['n2']))
1223
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1224
                      set(['n2']))
1225
    self.GL.release(locking.LEVEL_NODE)
1226
    self.GL.release(locking.LEVEL_INSTANCE)
1227
    self.GL.release(locking.LEVEL_CLUSTER)
1228

    
1229
  def testBGLDependency(self):
1230
    self.assertRaises(AssertionError, self.GL.acquire,
1231
                      locking.LEVEL_NODE, ['n1', 'n2'])
1232
    self.assertRaises(AssertionError, self.GL.acquire,
1233
                      locking.LEVEL_INSTANCE, ['i3'])
1234
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1235
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1236
    self.assertRaises(AssertionError, self.GL.release,
1237
                      locking.LEVEL_CLUSTER, ['BGL'])
1238
    self.assertRaises(AssertionError, self.GL.release,
1239
                      locking.LEVEL_CLUSTER)
1240
    self.GL.release(locking.LEVEL_NODE)
1241
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1242
    self.assertRaises(AssertionError, self.GL.release,
1243
                      locking.LEVEL_CLUSTER, ['BGL'])
1244
    self.assertRaises(AssertionError, self.GL.release,
1245
                      locking.LEVEL_CLUSTER)
1246
    self.GL.release(locking.LEVEL_INSTANCE)
1247

    
1248
  def testWrongOrder(self):
1249
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1250
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1251
    self.assertRaises(AssertionError, self.GL.acquire,
1252
                      locking.LEVEL_NODE, ['n1'])
1253
    self.assertRaises(AssertionError, self.GL.acquire,
1254
                      locking.LEVEL_INSTANCE, ['i2'])
1255

    
1256
  # Helper function to run as a thread that shared the BGL and then acquires
1257
  # some locks at another level.
1258
  def _doLock(self, level, names, shared):
1259
    try:
1260
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1261
      self.GL.acquire(level, names, shared=shared)
1262
      self.done.put('DONE')
1263
      self.GL.release(level)
1264
      self.GL.release(locking.LEVEL_CLUSTER)
1265
    except errors.LockError:
1266
      self.done.put('ERR')
1267

    
1268
  @_Repeat
1269
  def testConcurrency(self):
1270
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1271
    self._addThread(target=self._doLock,
1272
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1273
    self._waitThreads()
1274
    self.assertEqual(self.done.get_nowait(), 'DONE')
1275
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1276
    self._addThread(target=self._doLock,
1277
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1278
    self._waitThreads()
1279
    self.assertEqual(self.done.get_nowait(), 'DONE')
1280
    self._addThread(target=self._doLock,
1281
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1282
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1283
    self.GL.release(locking.LEVEL_INSTANCE)
1284
    self._waitThreads()
1285
    self.assertEqual(self.done.get_nowait(), 'DONE')
1286
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1287
    self._addThread(target=self._doLock,
1288
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1289
    self._waitThreads()
1290
    self.assertEqual(self.done.get_nowait(), 'DONE')
1291
    self._addThread(target=self._doLock,
1292
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1293
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1294
    self.GL.release(locking.LEVEL_INSTANCE)
1295
    self._waitThreads()
1296
    self.assertEqual(self.done.get(True, 1), 'DONE')
1297
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1298

    
1299

    
1300
if __name__ == '__main__':
1301
  unittest.main()
1302
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1303
  #unittest.TextTestRunner(verbosity=2).run(suite)