Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ b8140229

History | View | Annotate | Download (44 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
class _ThreadedTestCase(unittest.TestCase):
52
  """Test class that supports adding/waiting on threads"""
53
  def setUp(self):
54
    unittest.TestCase.setUp(self)
55
    self.done = Queue.Queue(0)
56
    self.threads = []
57

    
58
  def _addThread(self, *args, **kwargs):
59
    """Create and remember a new thread"""
60
    t = threading.Thread(*args, **kwargs)
61
    self.threads.append(t)
62
    t.start()
63
    return t
64

    
65
  def _waitThreads(self):
66
    """Wait for all our threads to finish"""
67
    for t in self.threads:
68
      t.join(60)
69
      self.failIf(t.isAlive())
70
    self.threads = []
71

    
72

    
73
class _ConditionTestCase(_ThreadedTestCase):
74
  """Common test case for conditions"""
75

    
76
  def setUp(self, cls):
77
    _ThreadedTestCase.setUp(self)
78
    self.lock = threading.Lock()
79
    self.cond = cls(self.lock)
80

    
81
  def _testAcquireRelease(self):
82
    self.assert_(not self.cond._is_owned())
83
    self.assertRaises(RuntimeError, self.cond.wait)
84
    self.assertRaises(RuntimeError, self.cond.notifyAll)
85

    
86
    self.cond.acquire()
87
    self.assert_(self.cond._is_owned())
88
    self.cond.notifyAll()
89
    self.assert_(self.cond._is_owned())
90
    self.cond.release()
91

    
92
    self.assert_(not self.cond._is_owned())
93
    self.assertRaises(RuntimeError, self.cond.wait)
94
    self.assertRaises(RuntimeError, self.cond.notifyAll)
95

    
96
  def _testNotification(self):
97
    def _NotifyAll():
98
      self.done.put("NE")
99
      self.cond.acquire()
100
      self.done.put("NA")
101
      self.cond.notifyAll()
102
      self.done.put("NN")
103
      self.cond.release()
104

    
105
    self.cond.acquire()
106
    self._addThread(target=_NotifyAll)
107
    self.assertEqual(self.done.get(True, 1), "NE")
108
    self.assertRaises(Queue.Empty, self.done.get_nowait)
109
    self.cond.wait()
110
    self.assertEqual(self.done.get(True, 1), "NA")
111
    self.assertEqual(self.done.get(True, 1), "NN")
112
    self.assert_(self.cond._is_owned())
113
    self.cond.release()
114
    self.assert_(not self.cond._is_owned())
115

    
116

    
117
class TestPipeCondition(_ConditionTestCase):
118
  """_PipeCondition tests"""
119

    
120
  def setUp(self):
121
    _ConditionTestCase.setUp(self, locking._PipeCondition)
122

    
123
  def testAcquireRelease(self):
124
    self._testAcquireRelease()
125

    
126
  def testNotification(self):
127
    self._testNotification()
128

    
129
  def _TestWait(self, fn):
130
    self._addThread(target=fn)
131
    self._addThread(target=fn)
132
    self._addThread(target=fn)
133

    
134
    # Wait for threads to be waiting
135
    self.assertEqual(self.done.get(True, 1), "A")
136
    self.assertEqual(self.done.get(True, 1), "A")
137
    self.assertEqual(self.done.get(True, 1), "A")
138

    
139
    self.assertRaises(Queue.Empty, self.done.get_nowait)
140

    
141
    self.cond.acquire()
142
    self.assertEqual(self.cond._nwaiters, 3)
143
    # This new thread can"t acquire the lock, and thus call wait, before we
144
    # release it
145
    self._addThread(target=fn)
146
    self.cond.notifyAll()
147
    self.assertRaises(Queue.Empty, self.done.get_nowait)
148
    self.cond.release()
149

    
150
    # We should now get 3 W and 1 A (for the new thread) in whatever order
151
    w = 0
152
    a = 0
153
    for i in range(4):
154
      got = self.done.get(True, 1)
155
      if got == "W":
156
        w += 1
157
      elif got == "A":
158
        a += 1
159
      else:
160
        self.fail("Got %s on the done queue" % got)
161

    
162
    self.assertEqual(w, 3)
163
    self.assertEqual(a, 1)
164

    
165
    self.cond.acquire()
166
    self.cond.notifyAll()
167
    self.cond.release()
168
    self._waitThreads()
169
    self.assertEqual(self.done.get_nowait(), "W")
170
    self.assertRaises(Queue.Empty, self.done.get_nowait)
171

    
172
  def testBlockingWait(self):
173
    def _BlockingWait():
174
      self.cond.acquire()
175
      self.done.put("A")
176
      self.cond.wait()
177
      self.cond.release()
178
      self.done.put("W")
179

    
180
    self._TestWait(_BlockingWait)
181

    
182
  def testLongTimeoutWait(self):
183
    def _Helper():
184
      self.cond.acquire()
185
      self.done.put("A")
186
      self.cond.wait(15.0)
187
      self.cond.release()
188
      self.done.put("W")
189

    
190
    self._TestWait(_Helper)
191

    
192
  def _TimeoutWait(self, timeout, check):
193
    self.cond.acquire()
194
    self.cond.wait(timeout)
195
    self.cond.release()
196
    self.done.put(check)
197

    
198
  def testShortTimeoutWait(self):
199
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
200
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
201
    self._waitThreads()
202
    self.assertEqual(self.done.get_nowait(), "T1")
203
    self.assertEqual(self.done.get_nowait(), "T1")
204
    self.assertRaises(Queue.Empty, self.done.get_nowait)
205

    
206
  def testZeroTimeoutWait(self):
207
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
208
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
209
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
210
    self._waitThreads()
211
    self.assertEqual(self.done.get_nowait(), "T0")
212
    self.assertEqual(self.done.get_nowait(), "T0")
213
    self.assertEqual(self.done.get_nowait(), "T0")
214
    self.assertRaises(Queue.Empty, self.done.get_nowait)
215

    
216

    
217
class TestSingleActionPipeCondition(unittest.TestCase):
218
  """_SingleActionPipeCondition tests"""
219

    
220
  def setUp(self):
221
    self.cond = locking._SingleActionPipeCondition()
222

    
223
  def testInitialization(self):
224
    self.assert_(self.cond._read_fd is not None)
225
    self.assert_(self.cond._write_fd is not None)
226
    self.assert_(self.cond._poller is not None)
227
    self.assertEqual(self.cond._nwaiters, 0)
228

    
229
  def testUsageCount(self):
230
    self.cond.StartWaiting()
231
    self.assert_(self.cond._read_fd is not None)
232
    self.assert_(self.cond._write_fd is not None)
233
    self.assert_(self.cond._poller is not None)
234
    self.assertEqual(self.cond._nwaiters, 1)
235

    
236
    # use again
237
    self.cond.StartWaiting()
238
    self.assertEqual(self.cond._nwaiters, 2)
239

    
240
    # there is more than one user
241
    self.assert_(not self.cond.DoneWaiting())
242
    self.assert_(self.cond._read_fd is not None)
243
    self.assert_(self.cond._write_fd is not None)
244
    self.assert_(self.cond._poller is not None)
245
    self.assertEqual(self.cond._nwaiters, 1)
246

    
247
    self.assert_(self.cond.DoneWaiting())
248
    self.assertEqual(self.cond._nwaiters, 0)
249
    self.assert_(self.cond._read_fd is None)
250
    self.assert_(self.cond._write_fd is None)
251
    self.assert_(self.cond._poller is None)
252

    
253
  def testNotify(self):
254
    wait1 = self.cond.StartWaiting()
255
    wait2 = self.cond.StartWaiting()
256

    
257
    self.assert_(self.cond._read_fd is not None)
258
    self.assert_(self.cond._write_fd is not None)
259
    self.assert_(self.cond._poller is not None)
260

    
261
    self.cond.notifyAll()
262

    
263
    self.assert_(self.cond._read_fd is not None)
264
    self.assert_(self.cond._write_fd is None)
265
    self.assert_(self.cond._poller is not None)
266

    
267
    self.assert_(not self.cond.DoneWaiting())
268

    
269
    self.assert_(self.cond._read_fd is not None)
270
    self.assert_(self.cond._write_fd is None)
271
    self.assert_(self.cond._poller is not None)
272

    
273
    self.assert_(self.cond.DoneWaiting())
274

    
275
    self.assert_(self.cond._read_fd is None)
276
    self.assert_(self.cond._write_fd is None)
277
    self.assert_(self.cond._poller is None)
278

    
279
  def testReusage(self):
280
    self.cond.StartWaiting()
281
    self.assert_(self.cond._read_fd is not None)
282
    self.assert_(self.cond._write_fd is not None)
283
    self.assert_(self.cond._poller is not None)
284

    
285
    self.assert_(self.cond.DoneWaiting())
286

    
287
    self.assertRaises(RuntimeError, self.cond.StartWaiting)
288
    self.assert_(self.cond._read_fd is None)
289
    self.assert_(self.cond._write_fd is None)
290
    self.assert_(self.cond._poller is None)
291

    
292
  def testNotifyTwice(self):
293
    self.cond.notifyAll()
294
    self.assertRaises(RuntimeError, self.cond.notifyAll)
295

    
296

    
297
class TestSharedLock(_ThreadedTestCase):
298
  """SharedLock tests"""
299

    
300
  def setUp(self):
301
    _ThreadedTestCase.setUp(self)
302
    self.sl = locking.SharedLock()
303

    
304
  def testSequenceAndOwnership(self):
305
    self.assert_(not self.sl._is_owned())
306
    self.sl.acquire(shared=1)
307
    self.assert_(self.sl._is_owned())
308
    self.assert_(self.sl._is_owned(shared=1))
309
    self.assert_(not self.sl._is_owned(shared=0))
310
    self.sl.release()
311
    self.assert_(not self.sl._is_owned())
312
    self.sl.acquire()
313
    self.assert_(self.sl._is_owned())
314
    self.assert_(not self.sl._is_owned(shared=1))
315
    self.assert_(self.sl._is_owned(shared=0))
316
    self.sl.release()
317
    self.assert_(not self.sl._is_owned())
318
    self.sl.acquire(shared=1)
319
    self.assert_(self.sl._is_owned())
320
    self.assert_(self.sl._is_owned(shared=1))
321
    self.assert_(not self.sl._is_owned(shared=0))
322
    self.sl.release()
323
    self.assert_(not self.sl._is_owned())
324

    
325
  def testBooleanValue(self):
326
    # semaphores are supposed to return a true value on a successful acquire
327
    self.assert_(self.sl.acquire(shared=1))
328
    self.sl.release()
329
    self.assert_(self.sl.acquire())
330
    self.sl.release()
331

    
332
  def testDoubleLockingStoE(self):
333
    self.sl.acquire(shared=1)
334
    self.assertRaises(AssertionError, self.sl.acquire)
335

    
336
  def testDoubleLockingEtoS(self):
337
    self.sl.acquire()
338
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
339

    
340
  def testDoubleLockingStoS(self):
341
    self.sl.acquire(shared=1)
342
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
343

    
344
  def testDoubleLockingEtoE(self):
345
    self.sl.acquire()
346
    self.assertRaises(AssertionError, self.sl.acquire)
347

    
348
  # helper functions: called in a separate thread they acquire the lock, send
349
  # their identifier on the done queue, then release it.
350
  def _doItSharer(self):
351
    try:
352
      self.sl.acquire(shared=1)
353
      self.done.put('SHR')
354
      self.sl.release()
355
    except errors.LockError:
356
      self.done.put('ERR')
357

    
358
  def _doItExclusive(self):
359
    try:
360
      self.sl.acquire()
361
      self.done.put('EXC')
362
      self.sl.release()
363
    except errors.LockError:
364
      self.done.put('ERR')
365

    
366
  def _doItDelete(self):
367
    try:
368
      self.sl.delete()
369
      self.done.put('DEL')
370
    except errors.LockError:
371
      self.done.put('ERR')
372

    
373
  def testSharersCanCoexist(self):
374
    self.sl.acquire(shared=1)
375
    threading.Thread(target=self._doItSharer).start()
376
    self.assert_(self.done.get(True, 1))
377
    self.sl.release()
378

    
379
  @_Repeat
380
  def testExclusiveBlocksExclusive(self):
381
    self.sl.acquire()
382
    self._addThread(target=self._doItExclusive)
383
    self.assertRaises(Queue.Empty, self.done.get_nowait)
384
    self.sl.release()
385
    self._waitThreads()
386
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
387

    
388
  @_Repeat
389
  def testExclusiveBlocksDelete(self):
390
    self.sl.acquire()
391
    self._addThread(target=self._doItDelete)
392
    self.assertRaises(Queue.Empty, self.done.get_nowait)
393
    self.sl.release()
394
    self._waitThreads()
395
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
396
    self.sl = locking.SharedLock()
397

    
398
  @_Repeat
399
  def testExclusiveBlocksSharer(self):
400
    self.sl.acquire()
401
    self._addThread(target=self._doItSharer)
402
    self.assertRaises(Queue.Empty, self.done.get_nowait)
403
    self.sl.release()
404
    self._waitThreads()
405
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
406

    
407
  @_Repeat
408
  def testSharerBlocksExclusive(self):
409
    self.sl.acquire(shared=1)
410
    self._addThread(target=self._doItExclusive)
411
    self.assertRaises(Queue.Empty, self.done.get_nowait)
412
    self.sl.release()
413
    self._waitThreads()
414
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
415

    
416
  @_Repeat
417
  def testSharerBlocksDelete(self):
418
    self.sl.acquire(shared=1)
419
    self._addThread(target=self._doItDelete)
420
    self.assertRaises(Queue.Empty, self.done.get_nowait)
421
    self.sl.release()
422
    self._waitThreads()
423
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
424
    self.sl = locking.SharedLock()
425

    
426
  @_Repeat
427
  def testWaitingExclusiveBlocksSharer(self):
428
    """SKIPPED testWaitingExclusiveBlockSharer"""
429
    return
430

    
431
    self.sl.acquire(shared=1)
432
    # the lock is acquired in shared mode...
433
    self._addThread(target=self._doItExclusive)
434
    # ...but now an exclusive is waiting...
435
    self._addThread(target=self._doItSharer)
436
    # ...so the sharer should be blocked as well
437
    self.assertRaises(Queue.Empty, self.done.get_nowait)
438
    self.sl.release()
439
    self._waitThreads()
440
    # The exclusive passed before
441
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
442
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
443

    
444
  @_Repeat
445
  def testWaitingSharerBlocksExclusive(self):
446
    """SKIPPED testWaitingSharerBlocksExclusive"""
447
    return
448

    
449
    self.sl.acquire()
450
    # the lock is acquired in exclusive mode...
451
    self._addThread(target=self._doItSharer)
452
    # ...but now a sharer is waiting...
453
    self._addThread(target=self._doItExclusive)
454
    # ...the exclusive is waiting too...
455
    self.assertRaises(Queue.Empty, self.done.get_nowait)
456
    self.sl.release()
457
    self._waitThreads()
458
    # The sharer passed before
459
    self.assertEqual(self.done.get_nowait(), 'SHR')
460
    self.assertEqual(self.done.get_nowait(), 'EXC')
461

    
462
  def testDelete(self):
463
    self.sl.delete()
464
    self.assertRaises(errors.LockError, self.sl.acquire)
465
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
466
    self.assertRaises(errors.LockError, self.sl.delete)
467

    
468
  def testDeleteTimeout(self):
469
    self.sl.delete(timeout=60)
470

    
471
  def testNoDeleteIfSharer(self):
472
    self.sl.acquire(shared=1)
473
    self.assertRaises(AssertionError, self.sl.delete)
474

    
475
  @_Repeat
476
  def testDeletePendingSharersExclusiveDelete(self):
477
    self.sl.acquire()
478
    self._addThread(target=self._doItSharer)
479
    self._addThread(target=self._doItSharer)
480
    self._addThread(target=self._doItExclusive)
481
    self._addThread(target=self._doItDelete)
482
    self.sl.delete()
483
    self._waitThreads()
484
    # The threads who were pending return ERR
485
    for _ in range(4):
486
      self.assertEqual(self.done.get_nowait(), 'ERR')
487
    self.sl = locking.SharedLock()
488

    
489
  @_Repeat
490
  def testDeletePendingDeleteExclusiveSharers(self):
491
    self.sl.acquire()
492
    self._addThread(target=self._doItDelete)
493
    self._addThread(target=self._doItExclusive)
494
    self._addThread(target=self._doItSharer)
495
    self._addThread(target=self._doItSharer)
496
    self.sl.delete()
497
    self._waitThreads()
498
    # The two threads who were pending return both ERR
499
    self.assertEqual(self.done.get_nowait(), 'ERR')
500
    self.assertEqual(self.done.get_nowait(), 'ERR')
501
    self.assertEqual(self.done.get_nowait(), 'ERR')
502
    self.assertEqual(self.done.get_nowait(), 'ERR')
503
    self.sl = locking.SharedLock()
504

    
505
  @_Repeat
506
  def testExclusiveAcquireTimeout(self):
507
    def _LockExclusive(wait):
508
      self.sl.acquire(shared=0)
509
      self.done.put("A: start sleep")
510
      time.sleep(wait)
511
      self.done.put("A: end sleep")
512
      self.sl.release()
513

    
514
    for shared in [0, 1]:
515
      # Start thread to hold lock for 20 ms
516
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
517

    
518
      # Wait for sleep to begin
519
      self.assertEqual(self.done.get(), "A: start sleep")
520

    
521
      # Wait up to 100 ms to get lock
522
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
523
      self.done.put("got 2nd")
524
      self.sl.release()
525

    
526
      self._waitThreads()
527

    
528
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
529
      self.assertEqual(self.done.get_nowait(), "got 2nd")
530
      self.assertRaises(Queue.Empty, self.done.get_nowait)
531

    
532
  @_Repeat
533
  def testAcquireExpiringTimeout(self):
534
    def _AcquireWithTimeout(shared, timeout):
535
      if not self.sl.acquire(shared=shared, timeout=timeout):
536
        self.done.put("timeout")
537

    
538
    for shared in [0, 1]:
539
      # Lock exclusively
540
      self.sl.acquire()
541

    
542
      # Start shared acquires with timeout between 0 and 20 ms
543
      for i in xrange(11):
544
        self._addThread(target=_AcquireWithTimeout,
545
                        args=(shared, i * 2.0 / 1000.0))
546

    
547
      # Wait for threads to finish (makes sure the acquire timeout expires
548
      # before releasing the lock)
549
      self._waitThreads()
550

    
551
      # Release lock
552
      self.sl.release()
553

    
554
      for _ in xrange(11):
555
        self.assertEqual(self.done.get_nowait(), "timeout")
556

    
557
      self.assertRaises(Queue.Empty, self.done.get_nowait)
558

    
559
  @_Repeat
560
  def testSharedSkipExclusiveAcquires(self):
561
    # Tests whether shared acquires jump in front of exclusive acquires in the
562
    # queue.
563

    
564
    # Get exclusive lock while we fill the queue
565
    self.sl.acquire()
566

    
567
    def _Acquire(shared, name):
568
      if not self.sl.acquire(shared=shared):
569
        return
570

    
571
      self.done.put(name)
572
      self.sl.release()
573

    
574
    # Start shared acquires
575
    for _ in xrange(5):
576
      self._addThread(target=_Acquire, args=(1, "shared A"))
577

    
578
    # Start exclusive acquires
579
    for _ in xrange(3):
580
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
581

    
582
    # More shared acquires
583
    for _ in xrange(5):
584
      self._addThread(target=_Acquire, args=(1, "shared C"))
585

    
586
    # More exclusive acquires
587
    for _ in xrange(3):
588
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
589

    
590
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
591
    # together. There's no way to wait for SharedLock.acquire to start
592
    # its work. Hence the timeout of 2 seconds.
593
    pending = 0
594
    end_time = time.time() + 2.0
595
    while time.time() < end_time:
596
      pending = self.sl._count_pending()
597
      self.assert_(pending >= 0 and pending <= 7)
598
      if pending == 7:
599
        break
600
      time.sleep(0.05)
601
    self.assertEqual(pending, 7)
602

    
603
    # Release exclusive lock and wait
604
    self.sl.release()
605

    
606
    self._waitThreads()
607

    
608
    # Check sequence
609
    shr_a = 0
610
    shr_c = 0
611
    for _ in xrange(10):
612
      # Shared locks aren't guaranteed to be notified in order, but they'll be
613
      # first
614
      tmp = self.done.get_nowait()
615
      if tmp == "shared A":
616
        shr_a += 1
617
      elif tmp == "shared C":
618
        shr_c += 1
619
    self.assertEqual(shr_a, 5)
620
    self.assertEqual(shr_c, 5)
621

    
622
    for _ in xrange(3):
623
      self.assertEqual(self.done.get_nowait(), "exclusive B")
624

    
625
    for _ in xrange(3):
626
      self.assertEqual(self.done.get_nowait(), "exclusive D")
627

    
628
    self.assertRaises(Queue.Empty, self.done.get_nowait)
629

    
630
  @_Repeat
631
  def testMixedAcquireTimeout(self):
632
    sync = threading.Condition()
633

    
634
    def _AcquireShared(ev):
635
      if not self.sl.acquire(shared=1, timeout=None):
636
        return
637

    
638
      self.done.put("shared")
639

    
640
      # Notify main thread
641
      ev.set()
642

    
643
      # Wait for notification
644
      sync.acquire()
645
      try:
646
        sync.wait()
647
      finally:
648
        sync.release()
649

    
650
      # Release lock
651
      self.sl.release()
652

    
653
    acquires = []
654
    for _ in xrange(3):
655
      ev = threading.Event()
656
      self._addThread(target=_AcquireShared, args=(ev, ))
657
      acquires.append(ev)
658

    
659
    # Wait for all acquires to finish
660
    for i in acquires:
661
      i.wait()
662

    
663
    self.assertEqual(self.sl._count_pending(), 0)
664

    
665
    # Try to get exclusive lock
666
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
667

    
668
    # Acquire exclusive without timeout
669
    exclsync = threading.Condition()
670
    exclev = threading.Event()
671

    
672
    def _AcquireExclusive():
673
      if not self.sl.acquire(shared=0):
674
        return
675

    
676
      self.done.put("exclusive")
677

    
678
      # Notify main thread
679
      exclev.set()
680

    
681
      exclsync.acquire()
682
      try:
683
        exclsync.wait()
684
      finally:
685
        exclsync.release()
686

    
687
      self.sl.release()
688

    
689
    self._addThread(target=_AcquireExclusive)
690

    
691
    # Try to get exclusive lock
692
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
693

    
694
    # Make all shared holders release their locks
695
    sync.acquire()
696
    try:
697
      sync.notifyAll()
698
    finally:
699
      sync.release()
700

    
701
    # Wait for exclusive acquire to succeed
702
    exclev.wait()
703

    
704
    self.assertEqual(self.sl._count_pending(), 0)
705

    
706
    # Try to get exclusive lock
707
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
708

    
709
    def _AcquireSharedSimple():
710
      if self.sl.acquire(shared=1, timeout=None):
711
        self.done.put("shared2")
712
        self.sl.release()
713

    
714
    for _ in xrange(10):
715
      self._addThread(target=_AcquireSharedSimple)
716

    
717
    # Tell exclusive lock to release
718
    exclsync.acquire()
719
    try:
720
      exclsync.notifyAll()
721
    finally:
722
      exclsync.release()
723

    
724
    # Wait for everything to finish
725
    self._waitThreads()
726

    
727
    self.assertEqual(self.sl._count_pending(), 0)
728

    
729
    # Check sequence
730
    for _ in xrange(3):
731
      self.assertEqual(self.done.get_nowait(), "shared")
732

    
733
    self.assertEqual(self.done.get_nowait(), "exclusive")
734

    
735
    for _ in xrange(10):
736
      self.assertEqual(self.done.get_nowait(), "shared2")
737

    
738
    self.assertRaises(Queue.Empty, self.done.get_nowait)
739

    
740

    
741
class TestSSynchronizedDecorator(_ThreadedTestCase):
742
  """Shared Lock Synchronized decorator test"""
743

    
744
  def setUp(self):
745
    _ThreadedTestCase.setUp(self)
746

    
747
  @locking.ssynchronized(_decoratorlock)
748
  def _doItExclusive(self):
749
    self.assert_(_decoratorlock._is_owned())
750
    self.done.put('EXC')
751

    
752
  @locking.ssynchronized(_decoratorlock, shared=1)
753
  def _doItSharer(self):
754
    self.assert_(_decoratorlock._is_owned(shared=1))
755
    self.done.put('SHR')
756

    
757
  def testDecoratedFunctions(self):
758
    self._doItExclusive()
759
    self.assert_(not _decoratorlock._is_owned())
760
    self._doItSharer()
761
    self.assert_(not _decoratorlock._is_owned())
762

    
763
  def testSharersCanCoexist(self):
764
    _decoratorlock.acquire(shared=1)
765
    threading.Thread(target=self._doItSharer).start()
766
    self.assert_(self.done.get(True, 1))
767
    _decoratorlock.release()
768

    
769
  @_Repeat
770
  def testExclusiveBlocksExclusive(self):
771
    _decoratorlock.acquire()
772
    self._addThread(target=self._doItExclusive)
773
    # give it a bit of time to check that it's not actually doing anything
774
    self.assertRaises(Queue.Empty, self.done.get_nowait)
775
    _decoratorlock.release()
776
    self._waitThreads()
777
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
778

    
779
  @_Repeat
780
  def testExclusiveBlocksSharer(self):
781
    _decoratorlock.acquire()
782
    self._addThread(target=self._doItSharer)
783
    self.assertRaises(Queue.Empty, self.done.get_nowait)
784
    _decoratorlock.release()
785
    self._waitThreads()
786
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
787

    
788
  @_Repeat
789
  def testSharerBlocksExclusive(self):
790
    _decoratorlock.acquire(shared=1)
791
    self._addThread(target=self._doItExclusive)
792
    self.assertRaises(Queue.Empty, self.done.get_nowait)
793
    _decoratorlock.release()
794
    self._waitThreads()
795
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
796

    
797

    
798
class TestLockSet(_ThreadedTestCase):
799
  """LockSet tests"""
800

    
801
  def setUp(self):
802
    _ThreadedTestCase.setUp(self)
803
    self._setUpLS()
804

    
805
  def _setUpLS(self):
806
    """Helper to (re)initialize the lock set"""
807
    self.resources = ['one', 'two', 'three']
808
    self.ls = locking.LockSet(members=self.resources)
809

    
810
  def testResources(self):
811
    self.assertEquals(self.ls._names(), set(self.resources))
812
    newls = locking.LockSet()
813
    self.assertEquals(newls._names(), set())
814

    
815
  def testAcquireRelease(self):
816
    self.assert_(self.ls.acquire('one'))
817
    self.assertEquals(self.ls._list_owned(), set(['one']))
818
    self.ls.release()
819
    self.assertEquals(self.ls._list_owned(), set())
820
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
821
    self.assertEquals(self.ls._list_owned(), set(['one']))
822
    self.ls.release()
823
    self.assertEquals(self.ls._list_owned(), set())
824
    self.ls.acquire(['one', 'two', 'three'])
825
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
826
    self.ls.release('one')
827
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
828
    self.ls.release(['three'])
829
    self.assertEquals(self.ls._list_owned(), set(['two']))
830
    self.ls.release()
831
    self.assertEquals(self.ls._list_owned(), set())
832
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
833
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
834
    self.ls.release()
835
    self.assertEquals(self.ls._list_owned(), set())
836

    
837
  def testNoDoubleAcquire(self):
838
    self.ls.acquire('one')
839
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
840
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
841
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
842
    self.ls.release()
843
    self.ls.acquire(['one', 'three'])
844
    self.ls.release('one')
845
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
846
    self.ls.release('three')
847

    
848
  def testNoWrongRelease(self):
849
    self.assertRaises(AssertionError, self.ls.release)
850
    self.ls.acquire('one')
851
    self.assertRaises(AssertionError, self.ls.release, 'two')
852

    
853
  def testAddRemove(self):
854
    self.ls.add('four')
855
    self.assertEquals(self.ls._list_owned(), set())
856
    self.assert_('four' in self.ls._names())
857
    self.ls.add(['five', 'six', 'seven'], acquired=1)
858
    self.assert_('five' in self.ls._names())
859
    self.assert_('six' in self.ls._names())
860
    self.assert_('seven' in self.ls._names())
861
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
862
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
863
    self.assert_('five' not in self.ls._names())
864
    self.assert_('six' not in self.ls._names())
865
    self.assertEquals(self.ls._list_owned(), set(['seven']))
866
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
867
    self.ls.remove('seven')
868
    self.assert_('seven' not in self.ls._names())
869
    self.assertEquals(self.ls._list_owned(), set([]))
870
    self.ls.acquire(None, shared=1)
871
    self.assertRaises(AssertionError, self.ls.add, 'eight')
872
    self.ls.release()
873
    self.ls.acquire(None)
874
    self.ls.add('eight', acquired=1)
875
    self.assert_('eight' in self.ls._names())
876
    self.assert_('eight' in self.ls._list_owned())
877
    self.ls.add('nine')
878
    self.assert_('nine' in self.ls._names())
879
    self.assert_('nine' not in self.ls._list_owned())
880
    self.ls.release()
881
    self.ls.remove(['two'])
882
    self.assert_('two' not in self.ls._names())
883
    self.ls.acquire('three')
884
    self.assertEquals(self.ls.remove(['three']), ['three'])
885
    self.assert_('three' not in self.ls._names())
886
    self.assertEquals(self.ls.remove('three'), [])
887
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
888
    self.assert_('one' not in self.ls._names())
889

    
890
  def testRemoveNonBlocking(self):
891
    self.ls.acquire('one')
892
    self.assertEquals(self.ls.remove('one'), ['one'])
893
    self.ls.acquire(['two', 'three'])
894
    self.assertEquals(self.ls.remove(['two', 'three']),
895
                      ['two', 'three'])
896

    
897
  def testNoDoubleAdd(self):
898
    self.assertRaises(errors.LockError, self.ls.add, 'two')
899
    self.ls.add('four')
900
    self.assertRaises(errors.LockError, self.ls.add, 'four')
901

    
902
  def testNoWrongRemoves(self):
903
    self.ls.acquire(['one', 'three'], shared=1)
904
    # Cannot remove 'two' while holding something which is not a superset
905
    self.assertRaises(AssertionError, self.ls.remove, 'two')
906
    # Cannot remove 'three' as we are sharing it
907
    self.assertRaises(AssertionError, self.ls.remove, 'three')
908

    
909
  def testAcquireSetLock(self):
910
    # acquire the set-lock exclusively
911
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
912
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
913
    self.assertEquals(self.ls._is_owned(), True)
914
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
915
    # I can still add/remove elements...
916
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
917
    self.assert_(self.ls.add('six'))
918
    self.ls.release()
919
    # share the set-lock
920
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
921
    # adding new elements is not possible
922
    self.assertRaises(AssertionError, self.ls.add, 'five')
923
    self.ls.release()
924

    
925
  def testAcquireWithRepetitions(self):
926
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
927
                      set(['two', 'two', 'three']))
928
    self.ls.release(['two', 'two'])
929
    self.assertEquals(self.ls._list_owned(), set(['three']))
930

    
931
  def testEmptyAcquire(self):
932
    # Acquire an empty list of locks...
933
    self.assertEquals(self.ls.acquire([]), set())
934
    self.assertEquals(self.ls._list_owned(), set())
935
    # New locks can still be addded
936
    self.assert_(self.ls.add('six'))
937
    # "re-acquiring" is not an issue, since we had really acquired nothing
938
    self.assertEquals(self.ls.acquire([], shared=1), set())
939
    self.assertEquals(self.ls._list_owned(), set())
940
    # We haven't really acquired anything, so we cannot release
941
    self.assertRaises(AssertionError, self.ls.release)
942

    
943
  def _doLockSet(self, names, shared):
944
    try:
945
      self.ls.acquire(names, shared=shared)
946
      self.done.put('DONE')
947
      self.ls.release()
948
    except errors.LockError:
949
      self.done.put('ERR')
950

    
951
  def _doAddSet(self, names):
952
    try:
953
      self.ls.add(names, acquired=1)
954
      self.done.put('DONE')
955
      self.ls.release()
956
    except errors.LockError:
957
      self.done.put('ERR')
958

    
959
  def _doRemoveSet(self, names):
960
    self.done.put(self.ls.remove(names))
961

    
962
  @_Repeat
963
  def testConcurrentSharedAcquire(self):
964
    self.ls.acquire(['one', 'two'], shared=1)
965
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
966
    self._waitThreads()
967
    self.assertEqual(self.done.get_nowait(), 'DONE')
968
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
969
    self._waitThreads()
970
    self.assertEqual(self.done.get_nowait(), 'DONE')
971
    self._addThread(target=self._doLockSet, args=('three', 1))
972
    self._waitThreads()
973
    self.assertEqual(self.done.get_nowait(), 'DONE')
974
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
975
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
976
    self.assertRaises(Queue.Empty, self.done.get_nowait)
977
    self.ls.release()
978
    self._waitThreads()
979
    self.assertEqual(self.done.get_nowait(), 'DONE')
980
    self.assertEqual(self.done.get_nowait(), 'DONE')
981

    
982
  @_Repeat
983
  def testConcurrentExclusiveAcquire(self):
984
    self.ls.acquire(['one', 'two'])
985
    self._addThread(target=self._doLockSet, args=('three', 1))
986
    self._waitThreads()
987
    self.assertEqual(self.done.get_nowait(), 'DONE')
988
    self._addThread(target=self._doLockSet, args=('three', 0))
989
    self._waitThreads()
990
    self.assertEqual(self.done.get_nowait(), 'DONE')
991
    self.assertRaises(Queue.Empty, self.done.get_nowait)
992
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
993
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
994
    self._addThread(target=self._doLockSet, args=('one', 0))
995
    self._addThread(target=self._doLockSet, args=('one', 1))
996
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
997
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
998
    self.assertRaises(Queue.Empty, self.done.get_nowait)
999
    self.ls.release()
1000
    self._waitThreads()
1001
    for _ in range(6):
1002
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1003

    
1004
  @_Repeat
1005
  def testConcurrentRemove(self):
1006
    self.ls.add('four')
1007
    self.ls.acquire(['one', 'two', 'four'])
1008
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1009
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1010
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1011
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1012
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1013
    self.ls.remove('one')
1014
    self.ls.release()
1015
    self._waitThreads()
1016
    for i in range(4):
1017
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1018
    self.ls.add(['five', 'six'], acquired=1)
1019
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1020
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1021
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1022
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1023
    self.ls.remove('five')
1024
    self.ls.release()
1025
    self._waitThreads()
1026
    for i in range(4):
1027
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1028
    self.ls.acquire(['three', 'four'])
1029
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1030
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1031
    self.ls.remove('four')
1032
    self._waitThreads()
1033
    self.assertEqual(self.done.get_nowait(), ['six'])
1034
    self._addThread(target=self._doRemoveSet, args=(['two']))
1035
    self._waitThreads()
1036
    self.assertEqual(self.done.get_nowait(), ['two'])
1037
    self.ls.release()
1038
    # reset lockset
1039
    self._setUpLS()
1040

    
1041
  @_Repeat
1042
  def testConcurrentSharedSetLock(self):
1043
    # share the set-lock...
1044
    self.ls.acquire(None, shared=1)
1045
    # ...another thread can share it too
1046
    self._addThread(target=self._doLockSet, args=(None, 1))
1047
    self._waitThreads()
1048
    self.assertEqual(self.done.get_nowait(), 'DONE')
1049
    # ...or just share some elements
1050
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1051
    self._waitThreads()
1052
    self.assertEqual(self.done.get_nowait(), 'DONE')
1053
    # ...but not add new ones or remove any
1054
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1055
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1056
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1057
    # this just releases the set-lock
1058
    self.ls.release([])
1059
    t.join(60)
1060
    self.assertEqual(self.done.get_nowait(), 'DONE')
1061
    # release the lock on the actual elements so remove() can proceed too
1062
    self.ls.release()
1063
    self._waitThreads()
1064
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1065
    # reset lockset
1066
    self._setUpLS()
1067

    
1068
  @_Repeat
1069
  def testConcurrentExclusiveSetLock(self):
1070
    # acquire the set-lock...
1071
    self.ls.acquire(None, shared=0)
1072
    # ...no one can do anything else
1073
    self._addThread(target=self._doLockSet, args=(None, 1))
1074
    self._addThread(target=self._doLockSet, args=(None, 0))
1075
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1076
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1077
    self._addThread(target=self._doAddSet, args=(['nine']))
1078
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1079
    self.ls.release()
1080
    self._waitThreads()
1081
    for _ in range(5):
1082
      self.assertEqual(self.done.get(True, 1), 'DONE')
1083
    # cleanup
1084
    self._setUpLS()
1085

    
1086
  @_Repeat
1087
  def testConcurrentSetLockAdd(self):
1088
    self.ls.acquire('one')
1089
    # Another thread wants the whole SetLock
1090
    self._addThread(target=self._doLockSet, args=(None, 0))
1091
    self._addThread(target=self._doLockSet, args=(None, 1))
1092
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1093
    self.assertRaises(AssertionError, self.ls.add, 'four')
1094
    self.ls.release()
1095
    self._waitThreads()
1096
    self.assertEqual(self.done.get_nowait(), 'DONE')
1097
    self.assertEqual(self.done.get_nowait(), 'DONE')
1098
    self.ls.acquire(None)
1099
    self._addThread(target=self._doLockSet, args=(None, 0))
1100
    self._addThread(target=self._doLockSet, args=(None, 1))
1101
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1102
    self.ls.add('four')
1103
    self.ls.add('five', acquired=1)
1104
    self.ls.add('six', acquired=1, shared=1)
1105
    self.assertEquals(self.ls._list_owned(),
1106
      set(['one', 'two', 'three', 'five', 'six']))
1107
    self.assertEquals(self.ls._is_owned(), True)
1108
    self.assertEquals(self.ls._names(),
1109
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1110
    self.ls.release()
1111
    self._waitThreads()
1112
    self.assertEqual(self.done.get_nowait(), 'DONE')
1113
    self.assertEqual(self.done.get_nowait(), 'DONE')
1114
    self._setUpLS()
1115

    
1116
  @_Repeat
1117
  def testEmptyLockSet(self):
1118
    # get the set-lock
1119
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1120
    # now empty it...
1121
    self.ls.remove(['one', 'two', 'three'])
1122
    # and adds/locks by another thread still wait
1123
    self._addThread(target=self._doAddSet, args=(['nine']))
1124
    self._addThread(target=self._doLockSet, args=(None, 1))
1125
    self._addThread(target=self._doLockSet, args=(None, 0))
1126
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1127
    self.ls.release()
1128
    self._waitThreads()
1129
    for _ in range(3):
1130
      self.assertEqual(self.done.get_nowait(), 'DONE')
1131
    # empty it again...
1132
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1133
    # now share it...
1134
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1135
    # other sharers can go, adds still wait
1136
    self._addThread(target=self._doLockSet, args=(None, 1))
1137
    self._waitThreads()
1138
    self.assertEqual(self.done.get_nowait(), 'DONE')
1139
    self._addThread(target=self._doAddSet, args=(['nine']))
1140
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1141
    self.ls.release()
1142
    self._waitThreads()
1143
    self.assertEqual(self.done.get_nowait(), 'DONE')
1144
    self._setUpLS()
1145

    
1146

    
1147
class TestGanetiLockManager(_ThreadedTestCase):
1148

    
1149
  def setUp(self):
1150
    _ThreadedTestCase.setUp(self)
1151
    self.nodes=['n1', 'n2']
1152
    self.instances=['i1', 'i2', 'i3']
1153
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1154
                                        instances=self.instances)
1155

    
1156
  def tearDown(self):
1157
    # Don't try this at home...
1158
    locking.GanetiLockManager._instance = None
1159

    
1160
  def testLockingConstants(self):
1161
    # The locking library internally cheats by assuming its constants have some
1162
    # relationships with each other. Check those hold true.
1163
    # This relationship is also used in the Processor to recursively acquire
1164
    # the right locks. Again, please don't break it.
1165
    for i in range(len(locking.LEVELS)):
1166
      self.assertEqual(i, locking.LEVELS[i])
1167

    
1168
  def testDoubleGLFails(self):
1169
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1170

    
1171
  def testLockNames(self):
1172
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1173
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1174
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1175
                     set(self.instances))
1176

    
1177
  def testInitAndResources(self):
1178
    locking.GanetiLockManager._instance = None
1179
    self.GL = locking.GanetiLockManager()
1180
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1181
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1182
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1183

    
1184
    locking.GanetiLockManager._instance = None
1185
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1186
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1187
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1188
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1189

    
1190
    locking.GanetiLockManager._instance = None
1191
    self.GL = locking.GanetiLockManager(instances=self.instances)
1192
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1193
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1194
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1195
                     set(self.instances))
1196

    
1197
  def testAcquireRelease(self):
1198
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1199
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1200
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1201
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1202
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1203
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1204
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1205
    self.GL.release(locking.LEVEL_NODE)
1206
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1207
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1208
    self.GL.release(locking.LEVEL_INSTANCE)
1209
    self.assertRaises(errors.LockError, self.GL.acquire,
1210
                      locking.LEVEL_INSTANCE, ['i5'])
1211
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1212
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1213

    
1214
  def testAcquireWholeSets(self):
1215
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1216
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1217
                      set(self.instances))
1218
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1219
                      set(self.instances))
1220
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1221
                      set(self.nodes))
1222
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1223
                      set(self.nodes))
1224
    self.GL.release(locking.LEVEL_NODE)
1225
    self.GL.release(locking.LEVEL_INSTANCE)
1226
    self.GL.release(locking.LEVEL_CLUSTER)
1227

    
1228
  def testAcquireWholeAndPartial(self):
1229
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1230
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1231
                      set(self.instances))
1232
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1233
                      set(self.instances))
1234
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1235
                      set(['n2']))
1236
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1237
                      set(['n2']))
1238
    self.GL.release(locking.LEVEL_NODE)
1239
    self.GL.release(locking.LEVEL_INSTANCE)
1240
    self.GL.release(locking.LEVEL_CLUSTER)
1241

    
1242
  def testBGLDependency(self):
1243
    self.assertRaises(AssertionError, self.GL.acquire,
1244
                      locking.LEVEL_NODE, ['n1', 'n2'])
1245
    self.assertRaises(AssertionError, self.GL.acquire,
1246
                      locking.LEVEL_INSTANCE, ['i3'])
1247
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1248
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1249
    self.assertRaises(AssertionError, self.GL.release,
1250
                      locking.LEVEL_CLUSTER, ['BGL'])
1251
    self.assertRaises(AssertionError, self.GL.release,
1252
                      locking.LEVEL_CLUSTER)
1253
    self.GL.release(locking.LEVEL_NODE)
1254
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1255
    self.assertRaises(AssertionError, self.GL.release,
1256
                      locking.LEVEL_CLUSTER, ['BGL'])
1257
    self.assertRaises(AssertionError, self.GL.release,
1258
                      locking.LEVEL_CLUSTER)
1259
    self.GL.release(locking.LEVEL_INSTANCE)
1260

    
1261
  def testWrongOrder(self):
1262
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1263
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1264
    self.assertRaises(AssertionError, self.GL.acquire,
1265
                      locking.LEVEL_NODE, ['n1'])
1266
    self.assertRaises(AssertionError, self.GL.acquire,
1267
                      locking.LEVEL_INSTANCE, ['i2'])
1268

    
1269
  # Helper function to run as a thread that shared the BGL and then acquires
1270
  # some locks at another level.
1271
  def _doLock(self, level, names, shared):
1272
    try:
1273
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1274
      self.GL.acquire(level, names, shared=shared)
1275
      self.done.put('DONE')
1276
      self.GL.release(level)
1277
      self.GL.release(locking.LEVEL_CLUSTER)
1278
    except errors.LockError:
1279
      self.done.put('ERR')
1280

    
1281
  @_Repeat
1282
  def testConcurrency(self):
1283
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1284
    self._addThread(target=self._doLock,
1285
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1286
    self._waitThreads()
1287
    self.assertEqual(self.done.get_nowait(), 'DONE')
1288
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1289
    self._addThread(target=self._doLock,
1290
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1291
    self._waitThreads()
1292
    self.assertEqual(self.done.get_nowait(), 'DONE')
1293
    self._addThread(target=self._doLock,
1294
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1295
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1296
    self.GL.release(locking.LEVEL_INSTANCE)
1297
    self._waitThreads()
1298
    self.assertEqual(self.done.get_nowait(), 'DONE')
1299
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1300
    self._addThread(target=self._doLock,
1301
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1302
    self._waitThreads()
1303
    self.assertEqual(self.done.get_nowait(), 'DONE')
1304
    self._addThread(target=self._doLock,
1305
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1306
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1307
    self.GL.release(locking.LEVEL_INSTANCE)
1308
    self._waitThreads()
1309
    self.assertEqual(self.done.get(True, 1), 'DONE')
1310
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1311

    
1312

    
1313
if __name__ == '__main__':
1314
  unittest.main()
1315
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1316
  #unittest.TextTestRunner(verbosity=2).run(suite)