Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 63f2e724

History | View | Annotate | Download (43.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
class _ThreadedTestCase(unittest.TestCase):
52
  """Test class that supports adding/waiting on threads"""
53
  def setUp(self):
54
    unittest.TestCase.setUp(self)
55
    self.done = Queue.Queue(0)
56
    self.threads = []
57

    
58
  def _addThread(self, *args, **kwargs):
59
    """Create and remember a new thread"""
60
    t = threading.Thread(*args, **kwargs)
61
    self.threads.append(t)
62
    t.start()
63
    return t
64

    
65
  def _waitThreads(self):
66
    """Wait for all our threads to finish"""
67
    for t in self.threads:
68
      t.join(60)
69
      self.failIf(t.isAlive())
70
    self.threads = []
71

    
72

    
73
class TestPipeCondition(_ThreadedTestCase):
74
  """_PipeCondition tests"""
75

    
76
  def setUp(self):
77
    _ThreadedTestCase.setUp(self)
78
    self.lock = threading.Lock()
79
    self.cond = locking._PipeCondition(self.lock)
80

    
81
  def testAcquireRelease(self):
82
    self.assert_(not self.cond._is_owned())
83
    self.assertRaises(RuntimeError, self.cond.wait)
84
    self.assertRaises(RuntimeError, self.cond.notifyAll)
85

    
86
    self.cond.acquire()
87
    self.assert_(self.cond._is_owned())
88
    self.cond.notifyAll()
89
    self.assert_(self.cond._is_owned())
90
    self.cond.release()
91

    
92
    self.assert_(not self.cond._is_owned())
93
    self.assertRaises(RuntimeError, self.cond.wait)
94
    self.assertRaises(RuntimeError, self.cond.notifyAll)
95

    
96
  def testNotification(self):
97
    def _NotifyAll():
98
      self.cond.acquire()
99
      self.cond.notifyAll()
100
      self.cond.release()
101

    
102
    self.cond.acquire()
103
    self._addThread(target=_NotifyAll)
104
    self.cond.wait()
105
    self.assert_(self.cond._is_owned())
106
    self.cond.release()
107
    self.assert_(not self.cond._is_owned())
108

    
109
  def _TestWait(self, fn):
110
    self._addThread(target=fn)
111
    self._addThread(target=fn)
112
    self._addThread(target=fn)
113

    
114
    # Wait for threads to be waiting
115
    self.assertEqual(self.done.get(True, 1), "A")
116
    self.assertEqual(self.done.get(True, 1), "A")
117
    self.assertEqual(self.done.get(True, 1), "A")
118

    
119
    self.assertRaises(Queue.Empty, self.done.get_nowait)
120

    
121
    self.cond.acquire()
122
    self.assertEqual(self.cond._nwaiters, 3)
123
    # This new thread can"t acquire the lock, and thus call wait, before we
124
    # release it
125
    self._addThread(target=fn)
126
    self.cond.notifyAll()
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.release()
129

    
130
    # We should now get 3 W and 1 A (for the new thread) in whatever order
131
    w = 0
132
    a = 0
133
    for i in range(4):
134
      got = self.done.get(True, 1)
135
      if got == "W":
136
        w += 1
137
      elif got == "A":
138
        a += 1
139
      else:
140
        self.fail("Got %s on the done queue" % got)
141

    
142
    self.assertEqual(w, 3)
143
    self.assertEqual(a, 1)
144

    
145
    self.cond.acquire()
146
    self.cond.notifyAll()
147
    self.cond.release()
148
    self._waitThreads()
149
    self.assertEqual(self.done.get_nowait(), "W")
150
    self.assertRaises(Queue.Empty, self.done.get_nowait)
151

    
152
  def testBlockingWait(self):
153
    def _BlockingWait():
154
      self.cond.acquire()
155
      self.done.put("A")
156
      self.cond.wait()
157
      self.cond.release()
158
      self.done.put("W")
159

    
160
    self._TestWait(_BlockingWait)
161

    
162
  def testLongTimeoutWait(self):
163
    def _Helper():
164
      self.cond.acquire()
165
      self.done.put("A")
166
      self.cond.wait(15.0)
167
      self.cond.release()
168
      self.done.put("W")
169

    
170
    self._TestWait(_Helper)
171

    
172
  def _TimeoutWait(self, timeout, check):
173
    self.cond.acquire()
174
    self.cond.wait(timeout)
175
    self.cond.release()
176
    self.done.put(check)
177

    
178
  def testShortTimeoutWait(self):
179
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
180
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
181
    self._waitThreads()
182
    self.assertEqual(self.done.get_nowait(), "T1")
183
    self.assertEqual(self.done.get_nowait(), "T1")
184
    self.assertRaises(Queue.Empty, self.done.get_nowait)
185

    
186
  def testZeroTimeoutWait(self):
187
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
188
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
189
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
190
    self._waitThreads()
191
    self.assertEqual(self.done.get_nowait(), "T0")
192
    self.assertEqual(self.done.get_nowait(), "T0")
193
    self.assertEqual(self.done.get_nowait(), "T0")
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195

    
196

    
197
class TestSingleActionPipeCondition(unittest.TestCase):
198
  """_SingleActionPipeCondition tests"""
199

    
200
  def setUp(self):
201
    self.cond = locking._SingleActionPipeCondition()
202

    
203
  def testInitialization(self):
204
    self.assert_(self.cond._read_fd is not None)
205
    self.assert_(self.cond._write_fd is not None)
206
    self.assert_(self.cond._poller is not None)
207
    self.assertEqual(self.cond._nwaiters, 0)
208

    
209
  def testUsageCount(self):
210
    self.cond.StartWaiting()
211
    self.assert_(self.cond._read_fd is not None)
212
    self.assert_(self.cond._write_fd is not None)
213
    self.assert_(self.cond._poller is not None)
214
    self.assertEqual(self.cond._nwaiters, 1)
215

    
216
    # use again
217
    self.cond.StartWaiting()
218
    self.assertEqual(self.cond._nwaiters, 2)
219

    
220
    # there is more than one user
221
    self.assert_(not self.cond.DoneWaiting())
222
    self.assert_(self.cond._read_fd is not None)
223
    self.assert_(self.cond._write_fd is not None)
224
    self.assert_(self.cond._poller is not None)
225
    self.assertEqual(self.cond._nwaiters, 1)
226

    
227
    self.assert_(self.cond.DoneWaiting())
228
    self.assertEqual(self.cond._nwaiters, 0)
229
    self.assert_(self.cond._read_fd is None)
230
    self.assert_(self.cond._write_fd is None)
231
    self.assert_(self.cond._poller is None)
232

    
233
  def testNotify(self):
234
    wait1 = self.cond.StartWaiting()
235
    wait2 = self.cond.StartWaiting()
236

    
237
    self.assert_(self.cond._read_fd is not None)
238
    self.assert_(self.cond._write_fd is not None)
239
    self.assert_(self.cond._poller is not None)
240

    
241
    self.cond.notifyAll()
242

    
243
    self.assert_(self.cond._read_fd is not None)
244
    self.assert_(self.cond._write_fd is None)
245
    self.assert_(self.cond._poller is not None)
246

    
247
    self.assert_(not self.cond.DoneWaiting())
248

    
249
    self.assert_(self.cond._read_fd is not None)
250
    self.assert_(self.cond._write_fd is None)
251
    self.assert_(self.cond._poller is not None)
252

    
253
    self.assert_(self.cond.DoneWaiting())
254

    
255
    self.assert_(self.cond._read_fd is None)
256
    self.assert_(self.cond._write_fd is None)
257
    self.assert_(self.cond._poller is None)
258

    
259
  def testReusage(self):
260
    self.cond.StartWaiting()
261
    self.assert_(self.cond._read_fd is not None)
262
    self.assert_(self.cond._write_fd is not None)
263
    self.assert_(self.cond._poller is not None)
264

    
265
    self.assert_(self.cond.DoneWaiting())
266

    
267
    self.assertRaises(RuntimeError, self.cond.StartWaiting)
268
    self.assert_(self.cond._read_fd is None)
269
    self.assert_(self.cond._write_fd is None)
270
    self.assert_(self.cond._poller is None)
271

    
272
  def testNotifyTwice(self):
273
    self.cond.notifyAll()
274
    self.assertRaises(RuntimeError, self.cond.notifyAll)
275

    
276

    
277
class TestSharedLock(_ThreadedTestCase):
278
  """SharedLock tests"""
279

    
280
  def setUp(self):
281
    _ThreadedTestCase.setUp(self)
282
    self.sl = locking.SharedLock()
283

    
284
  def testSequenceAndOwnership(self):
285
    self.assert_(not self.sl._is_owned())
286
    self.sl.acquire(shared=1)
287
    self.assert_(self.sl._is_owned())
288
    self.assert_(self.sl._is_owned(shared=1))
289
    self.assert_(not self.sl._is_owned(shared=0))
290
    self.sl.release()
291
    self.assert_(not self.sl._is_owned())
292
    self.sl.acquire()
293
    self.assert_(self.sl._is_owned())
294
    self.assert_(not self.sl._is_owned(shared=1))
295
    self.assert_(self.sl._is_owned(shared=0))
296
    self.sl.release()
297
    self.assert_(not self.sl._is_owned())
298
    self.sl.acquire(shared=1)
299
    self.assert_(self.sl._is_owned())
300
    self.assert_(self.sl._is_owned(shared=1))
301
    self.assert_(not self.sl._is_owned(shared=0))
302
    self.sl.release()
303
    self.assert_(not self.sl._is_owned())
304

    
305
  def testBooleanValue(self):
306
    # semaphores are supposed to return a true value on a successful acquire
307
    self.assert_(self.sl.acquire(shared=1))
308
    self.sl.release()
309
    self.assert_(self.sl.acquire())
310
    self.sl.release()
311

    
312
  def testDoubleLockingStoE(self):
313
    self.sl.acquire(shared=1)
314
    self.assertRaises(AssertionError, self.sl.acquire)
315

    
316
  def testDoubleLockingEtoS(self):
317
    self.sl.acquire()
318
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
319

    
320
  def testDoubleLockingStoS(self):
321
    self.sl.acquire(shared=1)
322
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
323

    
324
  def testDoubleLockingEtoE(self):
325
    self.sl.acquire()
326
    self.assertRaises(AssertionError, self.sl.acquire)
327

    
328
  # helper functions: called in a separate thread they acquire the lock, send
329
  # their identifier on the done queue, then release it.
330
  def _doItSharer(self):
331
    try:
332
      self.sl.acquire(shared=1)
333
      self.done.put('SHR')
334
      self.sl.release()
335
    except errors.LockError:
336
      self.done.put('ERR')
337

    
338
  def _doItExclusive(self):
339
    try:
340
      self.sl.acquire()
341
      self.done.put('EXC')
342
      self.sl.release()
343
    except errors.LockError:
344
      self.done.put('ERR')
345

    
346
  def _doItDelete(self):
347
    try:
348
      self.sl.delete()
349
      self.done.put('DEL')
350
    except errors.LockError:
351
      self.done.put('ERR')
352

    
353
  def testSharersCanCoexist(self):
354
    self.sl.acquire(shared=1)
355
    threading.Thread(target=self._doItSharer).start()
356
    self.assert_(self.done.get(True, 1))
357
    self.sl.release()
358

    
359
  @_Repeat
360
  def testExclusiveBlocksExclusive(self):
361
    self.sl.acquire()
362
    self._addThread(target=self._doItExclusive)
363
    self.assertRaises(Queue.Empty, self.done.get_nowait)
364
    self.sl.release()
365
    self._waitThreads()
366
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
367

    
368
  @_Repeat
369
  def testExclusiveBlocksDelete(self):
370
    self.sl.acquire()
371
    self._addThread(target=self._doItDelete)
372
    self.assertRaises(Queue.Empty, self.done.get_nowait)
373
    self.sl.release()
374
    self._waitThreads()
375
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
376
    self.sl = locking.SharedLock()
377

    
378
  @_Repeat
379
  def testExclusiveBlocksSharer(self):
380
    self.sl.acquire()
381
    self._addThread(target=self._doItSharer)
382
    self.assertRaises(Queue.Empty, self.done.get_nowait)
383
    self.sl.release()
384
    self._waitThreads()
385
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
386

    
387
  @_Repeat
388
  def testSharerBlocksExclusive(self):
389
    self.sl.acquire(shared=1)
390
    self._addThread(target=self._doItExclusive)
391
    self.assertRaises(Queue.Empty, self.done.get_nowait)
392
    self.sl.release()
393
    self._waitThreads()
394
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
395

    
396
  @_Repeat
397
  def testSharerBlocksDelete(self):
398
    self.sl.acquire(shared=1)
399
    self._addThread(target=self._doItDelete)
400
    self.assertRaises(Queue.Empty, self.done.get_nowait)
401
    self.sl.release()
402
    self._waitThreads()
403
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
404
    self.sl = locking.SharedLock()
405

    
406
  @_Repeat
407
  def testWaitingExclusiveBlocksSharer(self):
408
    """SKIPPED testWaitingExclusiveBlockSharer"""
409
    return
410

    
411
    self.sl.acquire(shared=1)
412
    # the lock is acquired in shared mode...
413
    self._addThread(target=self._doItExclusive)
414
    # ...but now an exclusive is waiting...
415
    self._addThread(target=self._doItSharer)
416
    # ...so the sharer should be blocked as well
417
    self.assertRaises(Queue.Empty, self.done.get_nowait)
418
    self.sl.release()
419
    self._waitThreads()
420
    # The exclusive passed before
421
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
422
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
423

    
424
  @_Repeat
425
  def testWaitingSharerBlocksExclusive(self):
426
    """SKIPPED testWaitingSharerBlocksExclusive"""
427
    return
428

    
429
    self.sl.acquire()
430
    # the lock is acquired in exclusive mode...
431
    self._addThread(target=self._doItSharer)
432
    # ...but now a sharer is waiting...
433
    self._addThread(target=self._doItExclusive)
434
    # ...the exclusive is waiting too...
435
    self.assertRaises(Queue.Empty, self.done.get_nowait)
436
    self.sl.release()
437
    self._waitThreads()
438
    # The sharer passed before
439
    self.assertEqual(self.done.get_nowait(), 'SHR')
440
    self.assertEqual(self.done.get_nowait(), 'EXC')
441

    
442
  def testDelete(self):
443
    self.sl.delete()
444
    self.assertRaises(errors.LockError, self.sl.acquire)
445
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
446
    self.assertRaises(errors.LockError, self.sl.delete)
447

    
448
  def testDeleteTimeout(self):
449
    self.sl.delete(timeout=60)
450

    
451
  def testNoDeleteIfSharer(self):
452
    self.sl.acquire(shared=1)
453
    self.assertRaises(AssertionError, self.sl.delete)
454

    
455
  @_Repeat
456
  def testDeletePendingSharersExclusiveDelete(self):
457
    self.sl.acquire()
458
    self._addThread(target=self._doItSharer)
459
    self._addThread(target=self._doItSharer)
460
    self._addThread(target=self._doItExclusive)
461
    self._addThread(target=self._doItDelete)
462
    self.sl.delete()
463
    self._waitThreads()
464
    # The threads who were pending return ERR
465
    for _ in range(4):
466
      self.assertEqual(self.done.get_nowait(), 'ERR')
467
    self.sl = locking.SharedLock()
468

    
469
  @_Repeat
470
  def testDeletePendingDeleteExclusiveSharers(self):
471
    self.sl.acquire()
472
    self._addThread(target=self._doItDelete)
473
    self._addThread(target=self._doItExclusive)
474
    self._addThread(target=self._doItSharer)
475
    self._addThread(target=self._doItSharer)
476
    self.sl.delete()
477
    self._waitThreads()
478
    # The two threads who were pending return both ERR
479
    self.assertEqual(self.done.get_nowait(), 'ERR')
480
    self.assertEqual(self.done.get_nowait(), 'ERR')
481
    self.assertEqual(self.done.get_nowait(), 'ERR')
482
    self.assertEqual(self.done.get_nowait(), 'ERR')
483
    self.sl = locking.SharedLock()
484

    
485
  @_Repeat
486
  def testExclusiveAcquireTimeout(self):
487
    def _LockExclusive(wait):
488
      self.sl.acquire(shared=0)
489
      self.done.put("A: start sleep")
490
      time.sleep(wait)
491
      self.done.put("A: end sleep")
492
      self.sl.release()
493

    
494
    for shared in [0, 1]:
495
      # Start thread to hold lock for 20 ms
496
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
497

    
498
      # Wait for sleep to begin
499
      self.assertEqual(self.done.get(), "A: start sleep")
500

    
501
      # Wait up to 100 ms to get lock
502
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
503
      self.done.put("got 2nd")
504
      self.sl.release()
505

    
506
      self._waitThreads()
507

    
508
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
509
      self.assertEqual(self.done.get_nowait(), "got 2nd")
510
      self.assertRaises(Queue.Empty, self.done.get_nowait)
511

    
512
  @_Repeat
513
  def testAcquireExpiringTimeout(self):
514
    def _AcquireWithTimeout(shared, timeout):
515
      if not self.sl.acquire(shared=shared, timeout=timeout):
516
        self.done.put("timeout")
517

    
518
    for shared in [0, 1]:
519
      # Lock exclusively
520
      self.sl.acquire()
521

    
522
      # Start shared acquires with timeout between 0 and 20 ms
523
      for i in xrange(11):
524
        self._addThread(target=_AcquireWithTimeout,
525
                        args=(shared, i * 2.0 / 1000.0))
526

    
527
      # Wait for threads to finish (makes sure the acquire timeout expires
528
      # before releasing the lock)
529
      self._waitThreads()
530

    
531
      # Release lock
532
      self.sl.release()
533

    
534
      for _ in xrange(11):
535
        self.assertEqual(self.done.get_nowait(), "timeout")
536

    
537
      self.assertRaises(Queue.Empty, self.done.get_nowait)
538

    
539
  @_Repeat
540
  def testSharedSkipExclusiveAcquires(self):
541
    # Tests whether shared acquires jump in front of exclusive acquires in the
542
    # queue.
543

    
544
    # Get exclusive lock while we fill the queue
545
    self.sl.acquire()
546

    
547
    def _Acquire(shared, name):
548
      if not self.sl.acquire(shared=shared):
549
        return
550

    
551
      self.done.put(name)
552
      self.sl.release()
553

    
554
    # Start shared acquires
555
    for _ in xrange(5):
556
      self._addThread(target=_Acquire, args=(1, "shared A"))
557

    
558
    # Start exclusive acquires
559
    for _ in xrange(3):
560
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
561

    
562
    # More shared acquires
563
    for _ in xrange(5):
564
      self._addThread(target=_Acquire, args=(1, "shared C"))
565

    
566
    # More exclusive acquires
567
    for _ in xrange(3):
568
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
569

    
570
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
571
    # together. There's no way to wait for SharedLock.acquire to start
572
    # its work. Hence the timeout of 2 seconds.
573
    pending = 0
574
    end_time = time.time() + 2.0
575
    while time.time() < end_time:
576
      pending = self.sl._count_pending()
577
      self.assert_(pending >= 0 and pending <= 7)
578
      if pending == 7:
579
        break
580
      time.sleep(0.05)
581
    self.assertEqual(pending, 7)
582

    
583
    # Release exclusive lock and wait
584
    self.sl.release()
585

    
586
    self._waitThreads()
587

    
588
    # Check sequence
589
    shr_a = 0
590
    shr_c = 0
591
    for _ in xrange(10):
592
      # Shared locks aren't guaranteed to be notified in order, but they'll be
593
      # first
594
      tmp = self.done.get_nowait()
595
      if tmp == "shared A":
596
        shr_a += 1
597
      elif tmp == "shared C":
598
        shr_c += 1
599
    self.assertEqual(shr_a, 5)
600
    self.assertEqual(shr_c, 5)
601

    
602
    for _ in xrange(3):
603
      self.assertEqual(self.done.get_nowait(), "exclusive B")
604

    
605
    for _ in xrange(3):
606
      self.assertEqual(self.done.get_nowait(), "exclusive D")
607

    
608
    self.assertRaises(Queue.Empty, self.done.get_nowait)
609

    
610
  @_Repeat
611
  def testMixedAcquireTimeout(self):
612
    sync = threading.Condition()
613

    
614
    def _AcquireShared(ev):
615
      if not self.sl.acquire(shared=1, timeout=None):
616
        return
617

    
618
      self.done.put("shared")
619

    
620
      # Notify main thread
621
      ev.set()
622

    
623
      # Wait for notification
624
      sync.acquire()
625
      try:
626
        sync.wait()
627
      finally:
628
        sync.release()
629

    
630
      # Release lock
631
      self.sl.release()
632

    
633
    acquires = []
634
    for _ in xrange(3):
635
      ev = threading.Event()
636
      self._addThread(target=_AcquireShared, args=(ev, ))
637
      acquires.append(ev)
638

    
639
    # Wait for all acquires to finish
640
    for i in acquires:
641
      i.wait()
642

    
643
    self.assertEqual(self.sl._count_pending(), 0)
644

    
645
    # Try to get exclusive lock
646
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
647

    
648
    # Acquire exclusive without timeout
649
    exclsync = threading.Condition()
650
    exclev = threading.Event()
651

    
652
    def _AcquireExclusive():
653
      if not self.sl.acquire(shared=0):
654
        return
655

    
656
      self.done.put("exclusive")
657

    
658
      # Notify main thread
659
      exclev.set()
660

    
661
      exclsync.acquire()
662
      try:
663
        exclsync.wait()
664
      finally:
665
        exclsync.release()
666

    
667
      self.sl.release()
668

    
669
    self._addThread(target=_AcquireExclusive)
670

    
671
    # Try to get exclusive lock
672
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
673

    
674
    # Make all shared holders release their locks
675
    sync.acquire()
676
    try:
677
      sync.notifyAll()
678
    finally:
679
      sync.release()
680

    
681
    # Wait for exclusive acquire to succeed
682
    exclev.wait()
683

    
684
    self.assertEqual(self.sl._count_pending(), 0)
685

    
686
    # Try to get exclusive lock
687
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
688

    
689
    def _AcquireSharedSimple():
690
      if self.sl.acquire(shared=1, timeout=None):
691
        self.done.put("shared2")
692
        self.sl.release()
693

    
694
    for _ in xrange(10):
695
      self._addThread(target=_AcquireSharedSimple)
696

    
697
    # Tell exclusive lock to release
698
    exclsync.acquire()
699
    try:
700
      exclsync.notifyAll()
701
    finally:
702
      exclsync.release()
703

    
704
    # Wait for everything to finish
705
    self._waitThreads()
706

    
707
    self.assertEqual(self.sl._count_pending(), 0)
708

    
709
    # Check sequence
710
    for _ in xrange(3):
711
      self.assertEqual(self.done.get_nowait(), "shared")
712

    
713
    self.assertEqual(self.done.get_nowait(), "exclusive")
714

    
715
    for _ in xrange(10):
716
      self.assertEqual(self.done.get_nowait(), "shared2")
717

    
718
    self.assertRaises(Queue.Empty, self.done.get_nowait)
719

    
720

    
721
class TestSSynchronizedDecorator(_ThreadedTestCase):
722
  """Shared Lock Synchronized decorator test"""
723

    
724
  def setUp(self):
725
    _ThreadedTestCase.setUp(self)
726

    
727
  @locking.ssynchronized(_decoratorlock)
728
  def _doItExclusive(self):
729
    self.assert_(_decoratorlock._is_owned())
730
    self.done.put('EXC')
731

    
732
  @locking.ssynchronized(_decoratorlock, shared=1)
733
  def _doItSharer(self):
734
    self.assert_(_decoratorlock._is_owned(shared=1))
735
    self.done.put('SHR')
736

    
737
  def testDecoratedFunctions(self):
738
    self._doItExclusive()
739
    self.assert_(not _decoratorlock._is_owned())
740
    self._doItSharer()
741
    self.assert_(not _decoratorlock._is_owned())
742

    
743
  def testSharersCanCoexist(self):
744
    _decoratorlock.acquire(shared=1)
745
    threading.Thread(target=self._doItSharer).start()
746
    self.assert_(self.done.get(True, 1))
747
    _decoratorlock.release()
748

    
749
  @_Repeat
750
  def testExclusiveBlocksExclusive(self):
751
    _decoratorlock.acquire()
752
    self._addThread(target=self._doItExclusive)
753
    # give it a bit of time to check that it's not actually doing anything
754
    self.assertRaises(Queue.Empty, self.done.get_nowait)
755
    _decoratorlock.release()
756
    self._waitThreads()
757
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
758

    
759
  @_Repeat
760
  def testExclusiveBlocksSharer(self):
761
    _decoratorlock.acquire()
762
    self._addThread(target=self._doItSharer)
763
    self.assertRaises(Queue.Empty, self.done.get_nowait)
764
    _decoratorlock.release()
765
    self._waitThreads()
766
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
767

    
768
  @_Repeat
769
  def testSharerBlocksExclusive(self):
770
    _decoratorlock.acquire(shared=1)
771
    self._addThread(target=self._doItExclusive)
772
    self.assertRaises(Queue.Empty, self.done.get_nowait)
773
    _decoratorlock.release()
774
    self._waitThreads()
775
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
776

    
777

    
778
class TestLockSet(_ThreadedTestCase):
779
  """LockSet tests"""
780

    
781
  def setUp(self):
782
    _ThreadedTestCase.setUp(self)
783
    self._setUpLS()
784

    
785
  def _setUpLS(self):
786
    """Helper to (re)initialize the lock set"""
787
    self.resources = ['one', 'two', 'three']
788
    self.ls = locking.LockSet(members=self.resources)
789

    
790
  def testResources(self):
791
    self.assertEquals(self.ls._names(), set(self.resources))
792
    newls = locking.LockSet()
793
    self.assertEquals(newls._names(), set())
794

    
795
  def testAcquireRelease(self):
796
    self.assert_(self.ls.acquire('one'))
797
    self.assertEquals(self.ls._list_owned(), set(['one']))
798
    self.ls.release()
799
    self.assertEquals(self.ls._list_owned(), set())
800
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
801
    self.assertEquals(self.ls._list_owned(), set(['one']))
802
    self.ls.release()
803
    self.assertEquals(self.ls._list_owned(), set())
804
    self.ls.acquire(['one', 'two', 'three'])
805
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
806
    self.ls.release('one')
807
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
808
    self.ls.release(['three'])
809
    self.assertEquals(self.ls._list_owned(), set(['two']))
810
    self.ls.release()
811
    self.assertEquals(self.ls._list_owned(), set())
812
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
813
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
814
    self.ls.release()
815
    self.assertEquals(self.ls._list_owned(), set())
816

    
817
  def testNoDoubleAcquire(self):
818
    self.ls.acquire('one')
819
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
820
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
821
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
822
    self.ls.release()
823
    self.ls.acquire(['one', 'three'])
824
    self.ls.release('one')
825
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
826
    self.ls.release('three')
827

    
828
  def testNoWrongRelease(self):
829
    self.assertRaises(AssertionError, self.ls.release)
830
    self.ls.acquire('one')
831
    self.assertRaises(AssertionError, self.ls.release, 'two')
832

    
833
  def testAddRemove(self):
834
    self.ls.add('four')
835
    self.assertEquals(self.ls._list_owned(), set())
836
    self.assert_('four' in self.ls._names())
837
    self.ls.add(['five', 'six', 'seven'], acquired=1)
838
    self.assert_('five' in self.ls._names())
839
    self.assert_('six' in self.ls._names())
840
    self.assert_('seven' in self.ls._names())
841
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
842
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
843
    self.assert_('five' not in self.ls._names())
844
    self.assert_('six' not in self.ls._names())
845
    self.assertEquals(self.ls._list_owned(), set(['seven']))
846
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
847
    self.ls.remove('seven')
848
    self.assert_('seven' not in self.ls._names())
849
    self.assertEquals(self.ls._list_owned(), set([]))
850
    self.ls.acquire(None, shared=1)
851
    self.assertRaises(AssertionError, self.ls.add, 'eight')
852
    self.ls.release()
853
    self.ls.acquire(None)
854
    self.ls.add('eight', acquired=1)
855
    self.assert_('eight' in self.ls._names())
856
    self.assert_('eight' in self.ls._list_owned())
857
    self.ls.add('nine')
858
    self.assert_('nine' in self.ls._names())
859
    self.assert_('nine' not in self.ls._list_owned())
860
    self.ls.release()
861
    self.ls.remove(['two'])
862
    self.assert_('two' not in self.ls._names())
863
    self.ls.acquire('three')
864
    self.assertEquals(self.ls.remove(['three']), ['three'])
865
    self.assert_('three' not in self.ls._names())
866
    self.assertEquals(self.ls.remove('three'), [])
867
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
868
    self.assert_('one' not in self.ls._names())
869

    
870
  def testRemoveNonBlocking(self):
871
    self.ls.acquire('one')
872
    self.assertEquals(self.ls.remove('one'), ['one'])
873
    self.ls.acquire(['two', 'three'])
874
    self.assertEquals(self.ls.remove(['two', 'three']),
875
                      ['two', 'three'])
876

    
877
  def testNoDoubleAdd(self):
878
    self.assertRaises(errors.LockError, self.ls.add, 'two')
879
    self.ls.add('four')
880
    self.assertRaises(errors.LockError, self.ls.add, 'four')
881

    
882
  def testNoWrongRemoves(self):
883
    self.ls.acquire(['one', 'three'], shared=1)
884
    # Cannot remove 'two' while holding something which is not a superset
885
    self.assertRaises(AssertionError, self.ls.remove, 'two')
886
    # Cannot remove 'three' as we are sharing it
887
    self.assertRaises(AssertionError, self.ls.remove, 'three')
888

    
889
  def testAcquireSetLock(self):
890
    # acquire the set-lock exclusively
891
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
892
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
893
    self.assertEquals(self.ls._is_owned(), True)
894
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
895
    # I can still add/remove elements...
896
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
897
    self.assert_(self.ls.add('six'))
898
    self.ls.release()
899
    # share the set-lock
900
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
901
    # adding new elements is not possible
902
    self.assertRaises(AssertionError, self.ls.add, 'five')
903
    self.ls.release()
904

    
905
  def testAcquireWithRepetitions(self):
906
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
907
                      set(['two', 'two', 'three']))
908
    self.ls.release(['two', 'two'])
909
    self.assertEquals(self.ls._list_owned(), set(['three']))
910

    
911
  def testEmptyAcquire(self):
912
    # Acquire an empty list of locks...
913
    self.assertEquals(self.ls.acquire([]), set())
914
    self.assertEquals(self.ls._list_owned(), set())
915
    # New locks can still be addded
916
    self.assert_(self.ls.add('six'))
917
    # "re-acquiring" is not an issue, since we had really acquired nothing
918
    self.assertEquals(self.ls.acquire([], shared=1), set())
919
    self.assertEquals(self.ls._list_owned(), set())
920
    # We haven't really acquired anything, so we cannot release
921
    self.assertRaises(AssertionError, self.ls.release)
922

    
923
  def _doLockSet(self, names, shared):
924
    try:
925
      self.ls.acquire(names, shared=shared)
926
      self.done.put('DONE')
927
      self.ls.release()
928
    except errors.LockError:
929
      self.done.put('ERR')
930

    
931
  def _doAddSet(self, names):
932
    try:
933
      self.ls.add(names, acquired=1)
934
      self.done.put('DONE')
935
      self.ls.release()
936
    except errors.LockError:
937
      self.done.put('ERR')
938

    
939
  def _doRemoveSet(self, names):
940
    self.done.put(self.ls.remove(names))
941

    
942
  @_Repeat
943
  def testConcurrentSharedAcquire(self):
944
    self.ls.acquire(['one', 'two'], shared=1)
945
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
946
    self._waitThreads()
947
    self.assertEqual(self.done.get_nowait(), 'DONE')
948
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
949
    self._waitThreads()
950
    self.assertEqual(self.done.get_nowait(), 'DONE')
951
    self._addThread(target=self._doLockSet, args=('three', 1))
952
    self._waitThreads()
953
    self.assertEqual(self.done.get_nowait(), 'DONE')
954
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
955
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
956
    self.assertRaises(Queue.Empty, self.done.get_nowait)
957
    self.ls.release()
958
    self._waitThreads()
959
    self.assertEqual(self.done.get_nowait(), 'DONE')
960
    self.assertEqual(self.done.get_nowait(), 'DONE')
961

    
962
  @_Repeat
963
  def testConcurrentExclusiveAcquire(self):
964
    self.ls.acquire(['one', 'two'])
965
    self._addThread(target=self._doLockSet, args=('three', 1))
966
    self._waitThreads()
967
    self.assertEqual(self.done.get_nowait(), 'DONE')
968
    self._addThread(target=self._doLockSet, args=('three', 0))
969
    self._waitThreads()
970
    self.assertEqual(self.done.get_nowait(), 'DONE')
971
    self.assertRaises(Queue.Empty, self.done.get_nowait)
972
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
973
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
974
    self._addThread(target=self._doLockSet, args=('one', 0))
975
    self._addThread(target=self._doLockSet, args=('one', 1))
976
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
977
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
978
    self.assertRaises(Queue.Empty, self.done.get_nowait)
979
    self.ls.release()
980
    self._waitThreads()
981
    for _ in range(6):
982
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
983

    
984
  @_Repeat
985
  def testConcurrentRemove(self):
986
    self.ls.add('four')
987
    self.ls.acquire(['one', 'two', 'four'])
988
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
989
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
990
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
991
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
992
    self.assertRaises(Queue.Empty, self.done.get_nowait)
993
    self.ls.remove('one')
994
    self.ls.release()
995
    self._waitThreads()
996
    for i in range(4):
997
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
998
    self.ls.add(['five', 'six'], acquired=1)
999
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1000
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1001
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1002
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1003
    self.ls.remove('five')
1004
    self.ls.release()
1005
    self._waitThreads()
1006
    for i in range(4):
1007
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1008
    self.ls.acquire(['three', 'four'])
1009
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1010
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1011
    self.ls.remove('four')
1012
    self._waitThreads()
1013
    self.assertEqual(self.done.get_nowait(), ['six'])
1014
    self._addThread(target=self._doRemoveSet, args=(['two']))
1015
    self._waitThreads()
1016
    self.assertEqual(self.done.get_nowait(), ['two'])
1017
    self.ls.release()
1018
    # reset lockset
1019
    self._setUpLS()
1020

    
1021
  @_Repeat
1022
  def testConcurrentSharedSetLock(self):
1023
    # share the set-lock...
1024
    self.ls.acquire(None, shared=1)
1025
    # ...another thread can share it too
1026
    self._addThread(target=self._doLockSet, args=(None, 1))
1027
    self._waitThreads()
1028
    self.assertEqual(self.done.get_nowait(), 'DONE')
1029
    # ...or just share some elements
1030
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1031
    self._waitThreads()
1032
    self.assertEqual(self.done.get_nowait(), 'DONE')
1033
    # ...but not add new ones or remove any
1034
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1035
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1036
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1037
    # this just releases the set-lock
1038
    self.ls.release([])
1039
    t.join(60)
1040
    self.assertEqual(self.done.get_nowait(), 'DONE')
1041
    # release the lock on the actual elements so remove() can proceed too
1042
    self.ls.release()
1043
    self._waitThreads()
1044
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1045
    # reset lockset
1046
    self._setUpLS()
1047

    
1048
  @_Repeat
1049
  def testConcurrentExclusiveSetLock(self):
1050
    # acquire the set-lock...
1051
    self.ls.acquire(None, shared=0)
1052
    # ...no one can do anything else
1053
    self._addThread(target=self._doLockSet, args=(None, 1))
1054
    self._addThread(target=self._doLockSet, args=(None, 0))
1055
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1056
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1057
    self._addThread(target=self._doAddSet, args=(['nine']))
1058
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1059
    self.ls.release()
1060
    self._waitThreads()
1061
    for _ in range(5):
1062
      self.assertEqual(self.done.get(True, 1), 'DONE')
1063
    # cleanup
1064
    self._setUpLS()
1065

    
1066
  @_Repeat
1067
  def testConcurrentSetLockAdd(self):
1068
    self.ls.acquire('one')
1069
    # Another thread wants the whole SetLock
1070
    self._addThread(target=self._doLockSet, args=(None, 0))
1071
    self._addThread(target=self._doLockSet, args=(None, 1))
1072
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1073
    self.assertRaises(AssertionError, self.ls.add, 'four')
1074
    self.ls.release()
1075
    self._waitThreads()
1076
    self.assertEqual(self.done.get_nowait(), 'DONE')
1077
    self.assertEqual(self.done.get_nowait(), 'DONE')
1078
    self.ls.acquire(None)
1079
    self._addThread(target=self._doLockSet, args=(None, 0))
1080
    self._addThread(target=self._doLockSet, args=(None, 1))
1081
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1082
    self.ls.add('four')
1083
    self.ls.add('five', acquired=1)
1084
    self.ls.add('six', acquired=1, shared=1)
1085
    self.assertEquals(self.ls._list_owned(),
1086
      set(['one', 'two', 'three', 'five', 'six']))
1087
    self.assertEquals(self.ls._is_owned(), True)
1088
    self.assertEquals(self.ls._names(),
1089
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1090
    self.ls.release()
1091
    self._waitThreads()
1092
    self.assertEqual(self.done.get_nowait(), 'DONE')
1093
    self.assertEqual(self.done.get_nowait(), 'DONE')
1094
    self._setUpLS()
1095

    
1096
  @_Repeat
1097
  def testEmptyLockSet(self):
1098
    # get the set-lock
1099
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1100
    # now empty it...
1101
    self.ls.remove(['one', 'two', 'three'])
1102
    # and adds/locks by another thread still wait
1103
    self._addThread(target=self._doAddSet, args=(['nine']))
1104
    self._addThread(target=self._doLockSet, args=(None, 1))
1105
    self._addThread(target=self._doLockSet, args=(None, 0))
1106
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1107
    self.ls.release()
1108
    self._waitThreads()
1109
    for _ in range(3):
1110
      self.assertEqual(self.done.get_nowait(), 'DONE')
1111
    # empty it again...
1112
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1113
    # now share it...
1114
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1115
    # other sharers can go, adds still wait
1116
    self._addThread(target=self._doLockSet, args=(None, 1))
1117
    self._waitThreads()
1118
    self.assertEqual(self.done.get_nowait(), 'DONE')
1119
    self._addThread(target=self._doAddSet, args=(['nine']))
1120
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1121
    self.ls.release()
1122
    self._waitThreads()
1123
    self.assertEqual(self.done.get_nowait(), 'DONE')
1124
    self._setUpLS()
1125

    
1126

    
1127
class TestGanetiLockManager(_ThreadedTestCase):
1128

    
1129
  def setUp(self):
1130
    _ThreadedTestCase.setUp(self)
1131
    self.nodes=['n1', 'n2']
1132
    self.instances=['i1', 'i2', 'i3']
1133
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1134
                                        instances=self.instances)
1135

    
1136
  def tearDown(self):
1137
    # Don't try this at home...
1138
    locking.GanetiLockManager._instance = None
1139

    
1140
  def testLockingConstants(self):
1141
    # The locking library internally cheats by assuming its constants have some
1142
    # relationships with each other. Check those hold true.
1143
    # This relationship is also used in the Processor to recursively acquire
1144
    # the right locks. Again, please don't break it.
1145
    for i in range(len(locking.LEVELS)):
1146
      self.assertEqual(i, locking.LEVELS[i])
1147

    
1148
  def testDoubleGLFails(self):
1149
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1150

    
1151
  def testLockNames(self):
1152
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1153
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1154
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1155
                     set(self.instances))
1156

    
1157
  def testInitAndResources(self):
1158
    locking.GanetiLockManager._instance = None
1159
    self.GL = locking.GanetiLockManager()
1160
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1161
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1162
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1163

    
1164
    locking.GanetiLockManager._instance = None
1165
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1166
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1167
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1168
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1169

    
1170
    locking.GanetiLockManager._instance = None
1171
    self.GL = locking.GanetiLockManager(instances=self.instances)
1172
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1173
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1174
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1175
                     set(self.instances))
1176

    
1177
  def testAcquireRelease(self):
1178
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1179
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1180
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1181
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1182
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1183
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1184
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1185
    self.GL.release(locking.LEVEL_NODE)
1186
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1187
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1188
    self.GL.release(locking.LEVEL_INSTANCE)
1189
    self.assertRaises(errors.LockError, self.GL.acquire,
1190
                      locking.LEVEL_INSTANCE, ['i5'])
1191
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1192
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1193

    
1194
  def testAcquireWholeSets(self):
1195
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1196
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1197
                      set(self.instances))
1198
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1199
                      set(self.instances))
1200
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1201
                      set(self.nodes))
1202
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1203
                      set(self.nodes))
1204
    self.GL.release(locking.LEVEL_NODE)
1205
    self.GL.release(locking.LEVEL_INSTANCE)
1206
    self.GL.release(locking.LEVEL_CLUSTER)
1207

    
1208
  def testAcquireWholeAndPartial(self):
1209
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1210
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1211
                      set(self.instances))
1212
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1213
                      set(self.instances))
1214
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1215
                      set(['n2']))
1216
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1217
                      set(['n2']))
1218
    self.GL.release(locking.LEVEL_NODE)
1219
    self.GL.release(locking.LEVEL_INSTANCE)
1220
    self.GL.release(locking.LEVEL_CLUSTER)
1221

    
1222
  def testBGLDependency(self):
1223
    self.assertRaises(AssertionError, self.GL.acquire,
1224
                      locking.LEVEL_NODE, ['n1', 'n2'])
1225
    self.assertRaises(AssertionError, self.GL.acquire,
1226
                      locking.LEVEL_INSTANCE, ['i3'])
1227
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1228
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1229
    self.assertRaises(AssertionError, self.GL.release,
1230
                      locking.LEVEL_CLUSTER, ['BGL'])
1231
    self.assertRaises(AssertionError, self.GL.release,
1232
                      locking.LEVEL_CLUSTER)
1233
    self.GL.release(locking.LEVEL_NODE)
1234
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1235
    self.assertRaises(AssertionError, self.GL.release,
1236
                      locking.LEVEL_CLUSTER, ['BGL'])
1237
    self.assertRaises(AssertionError, self.GL.release,
1238
                      locking.LEVEL_CLUSTER)
1239
    self.GL.release(locking.LEVEL_INSTANCE)
1240

    
1241
  def testWrongOrder(self):
1242
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1243
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1244
    self.assertRaises(AssertionError, self.GL.acquire,
1245
                      locking.LEVEL_NODE, ['n1'])
1246
    self.assertRaises(AssertionError, self.GL.acquire,
1247
                      locking.LEVEL_INSTANCE, ['i2'])
1248

    
1249
  # Helper function to run as a thread that shared the BGL and then acquires
1250
  # some locks at another level.
1251
  def _doLock(self, level, names, shared):
1252
    try:
1253
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1254
      self.GL.acquire(level, names, shared=shared)
1255
      self.done.put('DONE')
1256
      self.GL.release(level)
1257
      self.GL.release(locking.LEVEL_CLUSTER)
1258
    except errors.LockError:
1259
      self.done.put('ERR')
1260

    
1261
  @_Repeat
1262
  def testConcurrency(self):
1263
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1264
    self._addThread(target=self._doLock,
1265
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1266
    self._waitThreads()
1267
    self.assertEqual(self.done.get_nowait(), 'DONE')
1268
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1269
    self._addThread(target=self._doLock,
1270
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1271
    self._waitThreads()
1272
    self.assertEqual(self.done.get_nowait(), 'DONE')
1273
    self._addThread(target=self._doLock,
1274
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1275
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1276
    self.GL.release(locking.LEVEL_INSTANCE)
1277
    self._waitThreads()
1278
    self.assertEqual(self.done.get_nowait(), 'DONE')
1279
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1280
    self._addThread(target=self._doLock,
1281
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1282
    self._waitThreads()
1283
    self.assertEqual(self.done.get_nowait(), 'DONE')
1284
    self._addThread(target=self._doLock,
1285
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1286
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1287
    self.GL.release(locking.LEVEL_INSTANCE)
1288
    self._waitThreads()
1289
    self.assertEqual(self.done.get(True, 1), 'DONE')
1290
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1291

    
1292

    
1293
if __name__ == '__main__':
1294
  unittest.main()
1295
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1296
  #unittest.TextTestRunner(verbosity=2).run(suite)