Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 5aab242c

History | View | Annotate | Download (46.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
def SafeSleep(duration):
52
  start = time.time()
53
  while True:
54
    delay = start + duration - time.time()
55
    if delay <= 0.0:
56
      break
57
    time.sleep(delay)
58

    
59

    
60
class _ThreadedTestCase(unittest.TestCase):
61
  """Test class that supports adding/waiting on threads"""
62
  def setUp(self):
63
    unittest.TestCase.setUp(self)
64
    self.done = Queue.Queue(0)
65
    self.threads = []
66

    
67
  def _addThread(self, *args, **kwargs):
68
    """Create and remember a new thread"""
69
    t = threading.Thread(*args, **kwargs)
70
    self.threads.append(t)
71
    t.start()
72
    return t
73

    
74
  def _waitThreads(self):
75
    """Wait for all our threads to finish"""
76
    for t in self.threads:
77
      t.join(60)
78
      self.failIf(t.isAlive())
79
    self.threads = []
80

    
81

    
82
class _ConditionTestCase(_ThreadedTestCase):
83
  """Common test case for conditions"""
84

    
85
  def setUp(self, cls):
86
    _ThreadedTestCase.setUp(self)
87
    self.lock = threading.Lock()
88
    self.cond = cls(self.lock)
89

    
90
  def _testAcquireRelease(self):
91
    self.assert_(not self.cond._is_owned())
92
    self.assertRaises(RuntimeError, self.cond.wait)
93
    self.assertRaises(RuntimeError, self.cond.notifyAll)
94

    
95
    self.cond.acquire()
96
    self.assert_(self.cond._is_owned())
97
    self.cond.notifyAll()
98
    self.assert_(self.cond._is_owned())
99
    self.cond.release()
100

    
101
    self.assert_(not self.cond._is_owned())
102
    self.assertRaises(RuntimeError, self.cond.wait)
103
    self.assertRaises(RuntimeError, self.cond.notifyAll)
104

    
105
  def _testNotification(self):
106
    def _NotifyAll():
107
      self.done.put("NE")
108
      self.cond.acquire()
109
      self.done.put("NA")
110
      self.cond.notifyAll()
111
      self.done.put("NN")
112
      self.cond.release()
113

    
114
    self.cond.acquire()
115
    self._addThread(target=_NotifyAll)
116
    self.assertEqual(self.done.get(True, 1), "NE")
117
    self.assertRaises(Queue.Empty, self.done.get_nowait)
118
    self.cond.wait()
119
    self.assertEqual(self.done.get(True, 1), "NA")
120
    self.assertEqual(self.done.get(True, 1), "NN")
121
    self.assert_(self.cond._is_owned())
122
    self.cond.release()
123
    self.assert_(not self.cond._is_owned())
124

    
125

    
126
class TestSingleNotifyPipeCondition(_ConditionTestCase):
127
  """SingleNotifyPipeCondition tests"""
128

    
129
  def setUp(self):
130
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
131

    
132
  def testAcquireRelease(self):
133
    self._testAcquireRelease()
134

    
135
  def testNotification(self):
136
    self._testNotification()
137

    
138
  def testWaitReuse(self):
139
    self.cond.acquire()
140
    self.cond.wait(0)
141
    self.cond.wait(0.1)
142
    self.cond.release()
143

    
144
  def testNoNotifyReuse(self):
145
    self.cond.acquire()
146
    self.cond.notifyAll()
147
    self.assertRaises(RuntimeError, self.cond.wait)
148
    self.assertRaises(RuntimeError, self.cond.notifyAll)
149
    self.cond.release()
150

    
151

    
152
class TestPipeCondition(_ConditionTestCase):
153
  """PipeCondition tests"""
154

    
155
  def setUp(self):
156
    _ConditionTestCase.setUp(self, locking.PipeCondition)
157

    
158
  def testAcquireRelease(self):
159
    self._testAcquireRelease()
160

    
161
  def testNotification(self):
162
    self._testNotification()
163

    
164
  def _TestWait(self, fn):
165
    self._addThread(target=fn)
166
    self._addThread(target=fn)
167
    self._addThread(target=fn)
168

    
169
    # Wait for threads to be waiting
170
    self.assertEqual(self.done.get(True, 1), "A")
171
    self.assertEqual(self.done.get(True, 1), "A")
172
    self.assertEqual(self.done.get(True, 1), "A")
173

    
174
    self.assertRaises(Queue.Empty, self.done.get_nowait)
175

    
176
    self.cond.acquire()
177
    self.assertEqual(self.cond._nwaiters, 3)
178
    # This new thread can"t acquire the lock, and thus call wait, before we
179
    # release it
180
    self._addThread(target=fn)
181
    self.cond.notifyAll()
182
    self.assertRaises(Queue.Empty, self.done.get_nowait)
183
    self.cond.release()
184

    
185
    # We should now get 3 W and 1 A (for the new thread) in whatever order
186
    w = 0
187
    a = 0
188
    for i in range(4):
189
      got = self.done.get(True, 1)
190
      if got == "W":
191
        w += 1
192
      elif got == "A":
193
        a += 1
194
      else:
195
        self.fail("Got %s on the done queue" % got)
196

    
197
    self.assertEqual(w, 3)
198
    self.assertEqual(a, 1)
199

    
200
    self.cond.acquire()
201
    self.cond.notifyAll()
202
    self.cond.release()
203
    self._waitThreads()
204
    self.assertEqual(self.done.get_nowait(), "W")
205
    self.assertRaises(Queue.Empty, self.done.get_nowait)
206

    
207
  def testBlockingWait(self):
208
    def _BlockingWait():
209
      self.cond.acquire()
210
      self.done.put("A")
211
      self.cond.wait()
212
      self.cond.release()
213
      self.done.put("W")
214

    
215
    self._TestWait(_BlockingWait)
216

    
217
  def testLongTimeoutWait(self):
218
    def _Helper():
219
      self.cond.acquire()
220
      self.done.put("A")
221
      self.cond.wait(15.0)
222
      self.cond.release()
223
      self.done.put("W")
224

    
225
    self._TestWait(_Helper)
226

    
227
  def _TimeoutWait(self, timeout, check):
228
    self.cond.acquire()
229
    self.cond.wait(timeout)
230
    self.cond.release()
231
    self.done.put(check)
232

    
233
  def testShortTimeoutWait(self):
234
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
235
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
236
    self._waitThreads()
237
    self.assertEqual(self.done.get_nowait(), "T1")
238
    self.assertEqual(self.done.get_nowait(), "T1")
239
    self.assertRaises(Queue.Empty, self.done.get_nowait)
240

    
241
  def testZeroTimeoutWait(self):
242
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
243
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
244
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
245
    self._waitThreads()
246
    self.assertEqual(self.done.get_nowait(), "T0")
247
    self.assertEqual(self.done.get_nowait(), "T0")
248
    self.assertEqual(self.done.get_nowait(), "T0")
249
    self.assertRaises(Queue.Empty, self.done.get_nowait)
250

    
251

    
252
class TestSharedLock(_ThreadedTestCase):
253
  """SharedLock tests"""
254

    
255
  def setUp(self):
256
    _ThreadedTestCase.setUp(self)
257
    self.sl = locking.SharedLock()
258

    
259
  def testSequenceAndOwnership(self):
260
    self.assert_(not self.sl._is_owned())
261
    self.sl.acquire(shared=1)
262
    self.assert_(self.sl._is_owned())
263
    self.assert_(self.sl._is_owned(shared=1))
264
    self.assert_(not self.sl._is_owned(shared=0))
265
    self.sl.release()
266
    self.assert_(not self.sl._is_owned())
267
    self.sl.acquire()
268
    self.assert_(self.sl._is_owned())
269
    self.assert_(not self.sl._is_owned(shared=1))
270
    self.assert_(self.sl._is_owned(shared=0))
271
    self.sl.release()
272
    self.assert_(not self.sl._is_owned())
273
    self.sl.acquire(shared=1)
274
    self.assert_(self.sl._is_owned())
275
    self.assert_(self.sl._is_owned(shared=1))
276
    self.assert_(not self.sl._is_owned(shared=0))
277
    self.sl.release()
278
    self.assert_(not self.sl._is_owned())
279

    
280
  def testBooleanValue(self):
281
    # semaphores are supposed to return a true value on a successful acquire
282
    self.assert_(self.sl.acquire(shared=1))
283
    self.sl.release()
284
    self.assert_(self.sl.acquire())
285
    self.sl.release()
286

    
287
  def testDoubleLockingStoE(self):
288
    self.sl.acquire(shared=1)
289
    self.assertRaises(AssertionError, self.sl.acquire)
290

    
291
  def testDoubleLockingEtoS(self):
292
    self.sl.acquire()
293
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
294

    
295
  def testDoubleLockingStoS(self):
296
    self.sl.acquire(shared=1)
297
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
298

    
299
  def testDoubleLockingEtoE(self):
300
    self.sl.acquire()
301
    self.assertRaises(AssertionError, self.sl.acquire)
302

    
303
  # helper functions: called in a separate thread they acquire the lock, send
304
  # their identifier on the done queue, then release it.
305
  def _doItSharer(self):
306
    try:
307
      self.sl.acquire(shared=1)
308
      self.done.put('SHR')
309
      self.sl.release()
310
    except errors.LockError:
311
      self.done.put('ERR')
312

    
313
  def _doItExclusive(self):
314
    try:
315
      self.sl.acquire()
316
      self.done.put('EXC')
317
      self.sl.release()
318
    except errors.LockError:
319
      self.done.put('ERR')
320

    
321
  def _doItDelete(self):
322
    try:
323
      self.sl.delete()
324
      self.done.put('DEL')
325
    except errors.LockError:
326
      self.done.put('ERR')
327

    
328
  def testSharersCanCoexist(self):
329
    self.sl.acquire(shared=1)
330
    threading.Thread(target=self._doItSharer).start()
331
    self.assert_(self.done.get(True, 1))
332
    self.sl.release()
333

    
334
  @_Repeat
335
  def testExclusiveBlocksExclusive(self):
336
    self.sl.acquire()
337
    self._addThread(target=self._doItExclusive)
338
    self.assertRaises(Queue.Empty, self.done.get_nowait)
339
    self.sl.release()
340
    self._waitThreads()
341
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
342

    
343
  @_Repeat
344
  def testExclusiveBlocksDelete(self):
345
    self.sl.acquire()
346
    self._addThread(target=self._doItDelete)
347
    self.assertRaises(Queue.Empty, self.done.get_nowait)
348
    self.sl.release()
349
    self._waitThreads()
350
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
351
    self.sl = locking.SharedLock()
352

    
353
  @_Repeat
354
  def testExclusiveBlocksSharer(self):
355
    self.sl.acquire()
356
    self._addThread(target=self._doItSharer)
357
    self.assertRaises(Queue.Empty, self.done.get_nowait)
358
    self.sl.release()
359
    self._waitThreads()
360
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
361

    
362
  @_Repeat
363
  def testSharerBlocksExclusive(self):
364
    self.sl.acquire(shared=1)
365
    self._addThread(target=self._doItExclusive)
366
    self.assertRaises(Queue.Empty, self.done.get_nowait)
367
    self.sl.release()
368
    self._waitThreads()
369
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
370

    
371
  @_Repeat
372
  def testSharerBlocksDelete(self):
373
    self.sl.acquire(shared=1)
374
    self._addThread(target=self._doItDelete)
375
    self.assertRaises(Queue.Empty, self.done.get_nowait)
376
    self.sl.release()
377
    self._waitThreads()
378
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
379
    self.sl = locking.SharedLock()
380

    
381
  @_Repeat
382
  def testWaitingExclusiveBlocksSharer(self):
383
    """SKIPPED testWaitingExclusiveBlockSharer"""
384
    return
385

    
386
    self.sl.acquire(shared=1)
387
    # the lock is acquired in shared mode...
388
    self._addThread(target=self._doItExclusive)
389
    # ...but now an exclusive is waiting...
390
    self._addThread(target=self._doItSharer)
391
    # ...so the sharer should be blocked as well
392
    self.assertRaises(Queue.Empty, self.done.get_nowait)
393
    self.sl.release()
394
    self._waitThreads()
395
    # The exclusive passed before
396
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
397
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
398

    
399
  @_Repeat
400
  def testWaitingSharerBlocksExclusive(self):
401
    """SKIPPED testWaitingSharerBlocksExclusive"""
402
    return
403

    
404
    self.sl.acquire()
405
    # the lock is acquired in exclusive mode...
406
    self._addThread(target=self._doItSharer)
407
    # ...but now a sharer is waiting...
408
    self._addThread(target=self._doItExclusive)
409
    # ...the exclusive is waiting too...
410
    self.assertRaises(Queue.Empty, self.done.get_nowait)
411
    self.sl.release()
412
    self._waitThreads()
413
    # The sharer passed before
414
    self.assertEqual(self.done.get_nowait(), 'SHR')
415
    self.assertEqual(self.done.get_nowait(), 'EXC')
416

    
417
  def testDelete(self):
418
    self.sl.delete()
419
    self.assertRaises(errors.LockError, self.sl.acquire)
420
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
421
    self.assertRaises(errors.LockError, self.sl.delete)
422

    
423
  def testDeleteTimeout(self):
424
    self.sl.delete(timeout=60)
425

    
426
  def testNoDeleteIfSharer(self):
427
    self.sl.acquire(shared=1)
428
    self.assertRaises(AssertionError, self.sl.delete)
429

    
430
  @_Repeat
431
  def testDeletePendingSharersExclusiveDelete(self):
432
    self.sl.acquire()
433
    self._addThread(target=self._doItSharer)
434
    self._addThread(target=self._doItSharer)
435
    self._addThread(target=self._doItExclusive)
436
    self._addThread(target=self._doItDelete)
437
    self.sl.delete()
438
    self._waitThreads()
439
    # The threads who were pending return ERR
440
    for _ in range(4):
441
      self.assertEqual(self.done.get_nowait(), 'ERR')
442
    self.sl = locking.SharedLock()
443

    
444
  @_Repeat
445
  def testDeletePendingDeleteExclusiveSharers(self):
446
    self.sl.acquire()
447
    self._addThread(target=self._doItDelete)
448
    self._addThread(target=self._doItExclusive)
449
    self._addThread(target=self._doItSharer)
450
    self._addThread(target=self._doItSharer)
451
    self.sl.delete()
452
    self._waitThreads()
453
    # The two threads who were pending return both ERR
454
    self.assertEqual(self.done.get_nowait(), 'ERR')
455
    self.assertEqual(self.done.get_nowait(), 'ERR')
456
    self.assertEqual(self.done.get_nowait(), 'ERR')
457
    self.assertEqual(self.done.get_nowait(), 'ERR')
458
    self.sl = locking.SharedLock()
459

    
460
  @_Repeat
461
  def testExclusiveAcquireTimeout(self):
462
    for shared in [0, 1]:
463
      on_queue = threading.Event()
464
      release_exclusive = threading.Event()
465

    
466
      def _LockExclusive():
467
        self.sl.acquire(shared=0, test_notify=on_queue.set)
468
        self.done.put("A: start wait")
469
        release_exclusive.wait()
470
        self.done.put("A: end wait")
471
        self.sl.release()
472

    
473
      # Start thread to hold lock in exclusive mode
474
      self._addThread(target=_LockExclusive)
475

    
476
      # Wait for wait to begin
477
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
478

    
479
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
480
      # on the queue
481
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
482
                                      test_notify=release_exclusive.set))
483

    
484
      self.done.put("got 2nd")
485
      self.sl.release()
486

    
487
      self._waitThreads()
488

    
489
      self.assertEqual(self.done.get_nowait(), "A: end wait")
490
      self.assertEqual(self.done.get_nowait(), "got 2nd")
491
      self.assertRaises(Queue.Empty, self.done.get_nowait)
492

    
493
  @_Repeat
494
  def testAcquireExpiringTimeout(self):
495
    def _AcquireWithTimeout(shared, timeout):
496
      if not self.sl.acquire(shared=shared, timeout=timeout):
497
        self.done.put("timeout")
498

    
499
    for shared in [0, 1]:
500
      # Lock exclusively
501
      self.sl.acquire()
502

    
503
      # Start shared acquires with timeout between 0 and 20 ms
504
      for i in range(11):
505
        self._addThread(target=_AcquireWithTimeout,
506
                        args=(shared, i * 2.0 / 1000.0))
507

    
508
      # Wait for threads to finish (makes sure the acquire timeout expires
509
      # before releasing the lock)
510
      self._waitThreads()
511

    
512
      # Release lock
513
      self.sl.release()
514

    
515
      for _ in range(11):
516
        self.assertEqual(self.done.get_nowait(), "timeout")
517

    
518
      self.assertRaises(Queue.Empty, self.done.get_nowait)
519

    
520
  @_Repeat
521
  def testSharedSkipExclusiveAcquires(self):
522
    # Tests whether shared acquires jump in front of exclusive acquires in the
523
    # queue.
524

    
525
    def _Acquire(shared, name, notify_ev, wait_ev):
526
      if notify_ev:
527
        notify_fn = notify_ev.set
528
      else:
529
        notify_fn = None
530

    
531
      if wait_ev:
532
        wait_ev.wait()
533

    
534
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
535
        return
536

    
537
      self.done.put(name)
538
      self.sl.release()
539

    
540
    # Get exclusive lock while we fill the queue
541
    self.sl.acquire()
542

    
543
    shrcnt1 = 5
544
    shrcnt2 = 7
545
    shrcnt3 = 9
546
    shrcnt4 = 2
547

    
548
    # Add acquires using threading.Event for synchronization. They'll be
549
    # acquired exactly in the order defined in this list.
550
    acquires = (shrcnt1 * [(1, "shared 1")] +
551
                3 * [(0, "exclusive 1")] +
552
                shrcnt2 * [(1, "shared 2")] +
553
                shrcnt3 * [(1, "shared 3")] +
554
                shrcnt4 * [(1, "shared 4")] +
555
                3 * [(0, "exclusive 2")])
556

    
557
    ev_cur = None
558
    ev_prev = None
559

    
560
    for args in acquires:
561
      ev_cur = threading.Event()
562
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
563
      ev_prev = ev_cur
564

    
565
    # Wait for last acquire to start
566
    ev_prev.wait()
567

    
568
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
569
    # together
570
    self.assertEqual(self.sl._count_pending(), 7)
571

    
572
    # Release exclusive lock and wait
573
    self.sl.release()
574

    
575
    self._waitThreads()
576

    
577
    # Check sequence
578
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
579
      # Shared locks aren't guaranteed to be notified in order, but they'll be
580
      # first
581
      tmp = self.done.get_nowait()
582
      if tmp == "shared 1":
583
        shrcnt1 -= 1
584
      elif tmp == "shared 2":
585
        shrcnt2 -= 1
586
      elif tmp == "shared 3":
587
        shrcnt3 -= 1
588
      elif tmp == "shared 4":
589
        shrcnt4 -= 1
590
    self.assertEqual(shrcnt1, 0)
591
    self.assertEqual(shrcnt2, 0)
592
    self.assertEqual(shrcnt3, 0)
593
    self.assertEqual(shrcnt3, 0)
594

    
595
    for _ in range(3):
596
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
597

    
598
    for _ in range(3):
599
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
600

    
601
    self.assertRaises(Queue.Empty, self.done.get_nowait)
602

    
603
  @_Repeat
604
  def testMixedAcquireTimeout(self):
605
    sync = threading.Condition()
606

    
607
    def _AcquireShared(ev):
608
      if not self.sl.acquire(shared=1, timeout=None):
609
        return
610

    
611
      self.done.put("shared")
612

    
613
      # Notify main thread
614
      ev.set()
615

    
616
      # Wait for notification
617
      sync.acquire()
618
      try:
619
        sync.wait()
620
      finally:
621
        sync.release()
622

    
623
      # Release lock
624
      self.sl.release()
625

    
626
    acquires = []
627
    for _ in range(3):
628
      ev = threading.Event()
629
      self._addThread(target=_AcquireShared, args=(ev, ))
630
      acquires.append(ev)
631

    
632
    # Wait for all acquires to finish
633
    for i in acquires:
634
      i.wait()
635

    
636
    self.assertEqual(self.sl._count_pending(), 0)
637

    
638
    # Try to get exclusive lock
639
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
640

    
641
    # Acquire exclusive without timeout
642
    exclsync = threading.Condition()
643
    exclev = threading.Event()
644

    
645
    def _AcquireExclusive():
646
      if not self.sl.acquire(shared=0):
647
        return
648

    
649
      self.done.put("exclusive")
650

    
651
      # Notify main thread
652
      exclev.set()
653

    
654
      exclsync.acquire()
655
      try:
656
        exclsync.wait()
657
      finally:
658
        exclsync.release()
659

    
660
      self.sl.release()
661

    
662
    self._addThread(target=_AcquireExclusive)
663

    
664
    # Try to get exclusive lock
665
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
666

    
667
    # Make all shared holders release their locks
668
    sync.acquire()
669
    try:
670
      sync.notifyAll()
671
    finally:
672
      sync.release()
673

    
674
    # Wait for exclusive acquire to succeed
675
    exclev.wait()
676

    
677
    self.assertEqual(self.sl._count_pending(), 0)
678

    
679
    # Try to get exclusive lock
680
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
681

    
682
    def _AcquireSharedSimple():
683
      if self.sl.acquire(shared=1, timeout=None):
684
        self.done.put("shared2")
685
        self.sl.release()
686

    
687
    for _ in range(10):
688
      self._addThread(target=_AcquireSharedSimple)
689

    
690
    # Tell exclusive lock to release
691
    exclsync.acquire()
692
    try:
693
      exclsync.notifyAll()
694
    finally:
695
      exclsync.release()
696

    
697
    # Wait for everything to finish
698
    self._waitThreads()
699

    
700
    self.assertEqual(self.sl._count_pending(), 0)
701

    
702
    # Check sequence
703
    for _ in range(3):
704
      self.assertEqual(self.done.get_nowait(), "shared")
705

    
706
    self.assertEqual(self.done.get_nowait(), "exclusive")
707

    
708
    for _ in range(10):
709
      self.assertEqual(self.done.get_nowait(), "shared2")
710

    
711
    self.assertRaises(Queue.Empty, self.done.get_nowait)
712

    
713

    
714
class TestSSynchronizedDecorator(_ThreadedTestCase):
715
  """Shared Lock Synchronized decorator test"""
716

    
717
  def setUp(self):
718
    _ThreadedTestCase.setUp(self)
719

    
720
  @locking.ssynchronized(_decoratorlock)
721
  def _doItExclusive(self):
722
    self.assert_(_decoratorlock._is_owned())
723
    self.done.put('EXC')
724

    
725
  @locking.ssynchronized(_decoratorlock, shared=1)
726
  def _doItSharer(self):
727
    self.assert_(_decoratorlock._is_owned(shared=1))
728
    self.done.put('SHR')
729

    
730
  def testDecoratedFunctions(self):
731
    self._doItExclusive()
732
    self.assert_(not _decoratorlock._is_owned())
733
    self._doItSharer()
734
    self.assert_(not _decoratorlock._is_owned())
735

    
736
  def testSharersCanCoexist(self):
737
    _decoratorlock.acquire(shared=1)
738
    threading.Thread(target=self._doItSharer).start()
739
    self.assert_(self.done.get(True, 1))
740
    _decoratorlock.release()
741

    
742
  @_Repeat
743
  def testExclusiveBlocksExclusive(self):
744
    _decoratorlock.acquire()
745
    self._addThread(target=self._doItExclusive)
746
    # give it a bit of time to check that it's not actually doing anything
747
    self.assertRaises(Queue.Empty, self.done.get_nowait)
748
    _decoratorlock.release()
749
    self._waitThreads()
750
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
751

    
752
  @_Repeat
753
  def testExclusiveBlocksSharer(self):
754
    _decoratorlock.acquire()
755
    self._addThread(target=self._doItSharer)
756
    self.assertRaises(Queue.Empty, self.done.get_nowait)
757
    _decoratorlock.release()
758
    self._waitThreads()
759
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
760

    
761
  @_Repeat
762
  def testSharerBlocksExclusive(self):
763
    _decoratorlock.acquire(shared=1)
764
    self._addThread(target=self._doItExclusive)
765
    self.assertRaises(Queue.Empty, self.done.get_nowait)
766
    _decoratorlock.release()
767
    self._waitThreads()
768
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
769

    
770

    
771
class TestLockSet(_ThreadedTestCase):
772
  """LockSet tests"""
773

    
774
  def setUp(self):
775
    _ThreadedTestCase.setUp(self)
776
    self._setUpLS()
777

    
778
  def _setUpLS(self):
779
    """Helper to (re)initialize the lock set"""
780
    self.resources = ['one', 'two', 'three']
781
    self.ls = locking.LockSet(members=self.resources)
782

    
783
  def testResources(self):
784
    self.assertEquals(self.ls._names(), set(self.resources))
785
    newls = locking.LockSet()
786
    self.assertEquals(newls._names(), set())
787

    
788
  def testAcquireRelease(self):
789
    self.assert_(self.ls.acquire('one'))
790
    self.assertEquals(self.ls._list_owned(), set(['one']))
791
    self.ls.release()
792
    self.assertEquals(self.ls._list_owned(), set())
793
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
794
    self.assertEquals(self.ls._list_owned(), set(['one']))
795
    self.ls.release()
796
    self.assertEquals(self.ls._list_owned(), set())
797
    self.ls.acquire(['one', 'two', 'three'])
798
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
799
    self.ls.release('one')
800
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
801
    self.ls.release(['three'])
802
    self.assertEquals(self.ls._list_owned(), set(['two']))
803
    self.ls.release()
804
    self.assertEquals(self.ls._list_owned(), set())
805
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
806
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
807
    self.ls.release()
808
    self.assertEquals(self.ls._list_owned(), set())
809

    
810
  def testNoDoubleAcquire(self):
811
    self.ls.acquire('one')
812
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
813
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
814
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
815
    self.ls.release()
816
    self.ls.acquire(['one', 'three'])
817
    self.ls.release('one')
818
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
819
    self.ls.release('three')
820

    
821
  def testNoWrongRelease(self):
822
    self.assertRaises(AssertionError, self.ls.release)
823
    self.ls.acquire('one')
824
    self.assertRaises(AssertionError, self.ls.release, 'two')
825

    
826
  def testAddRemove(self):
827
    self.ls.add('four')
828
    self.assertEquals(self.ls._list_owned(), set())
829
    self.assert_('four' in self.ls._names())
830
    self.ls.add(['five', 'six', 'seven'], acquired=1)
831
    self.assert_('five' in self.ls._names())
832
    self.assert_('six' in self.ls._names())
833
    self.assert_('seven' in self.ls._names())
834
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
835
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
836
    self.assert_('five' not in self.ls._names())
837
    self.assert_('six' not in self.ls._names())
838
    self.assertEquals(self.ls._list_owned(), set(['seven']))
839
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
840
    self.ls.remove('seven')
841
    self.assert_('seven' not in self.ls._names())
842
    self.assertEquals(self.ls._list_owned(), set([]))
843
    self.ls.acquire(None, shared=1)
844
    self.assertRaises(AssertionError, self.ls.add, 'eight')
845
    self.ls.release()
846
    self.ls.acquire(None)
847
    self.ls.add('eight', acquired=1)
848
    self.assert_('eight' in self.ls._names())
849
    self.assert_('eight' in self.ls._list_owned())
850
    self.ls.add('nine')
851
    self.assert_('nine' in self.ls._names())
852
    self.assert_('nine' not in self.ls._list_owned())
853
    self.ls.release()
854
    self.ls.remove(['two'])
855
    self.assert_('two' not in self.ls._names())
856
    self.ls.acquire('three')
857
    self.assertEquals(self.ls.remove(['three']), ['three'])
858
    self.assert_('three' not in self.ls._names())
859
    self.assertEquals(self.ls.remove('three'), [])
860
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
861
    self.assert_('one' not in self.ls._names())
862

    
863
  def testRemoveNonBlocking(self):
864
    self.ls.acquire('one')
865
    self.assertEquals(self.ls.remove('one'), ['one'])
866
    self.ls.acquire(['two', 'three'])
867
    self.assertEquals(self.ls.remove(['two', 'three']),
868
                      ['two', 'three'])
869

    
870
  def testNoDoubleAdd(self):
871
    self.assertRaises(errors.LockError, self.ls.add, 'two')
872
    self.ls.add('four')
873
    self.assertRaises(errors.LockError, self.ls.add, 'four')
874

    
875
  def testNoWrongRemoves(self):
876
    self.ls.acquire(['one', 'three'], shared=1)
877
    # Cannot remove 'two' while holding something which is not a superset
878
    self.assertRaises(AssertionError, self.ls.remove, 'two')
879
    # Cannot remove 'three' as we are sharing it
880
    self.assertRaises(AssertionError, self.ls.remove, 'three')
881

    
882
  def testAcquireSetLock(self):
883
    # acquire the set-lock exclusively
884
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
885
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
886
    self.assertEquals(self.ls._is_owned(), True)
887
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
888
    # I can still add/remove elements...
889
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
890
    self.assert_(self.ls.add('six'))
891
    self.ls.release()
892
    # share the set-lock
893
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
894
    # adding new elements is not possible
895
    self.assertRaises(AssertionError, self.ls.add, 'five')
896
    self.ls.release()
897

    
898
  def testAcquireWithRepetitions(self):
899
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
900
                      set(['two', 'two', 'three']))
901
    self.ls.release(['two', 'two'])
902
    self.assertEquals(self.ls._list_owned(), set(['three']))
903

    
904
  def testEmptyAcquire(self):
905
    # Acquire an empty list of locks...
906
    self.assertEquals(self.ls.acquire([]), set())
907
    self.assertEquals(self.ls._list_owned(), set())
908
    # New locks can still be addded
909
    self.assert_(self.ls.add('six'))
910
    # "re-acquiring" is not an issue, since we had really acquired nothing
911
    self.assertEquals(self.ls.acquire([], shared=1), set())
912
    self.assertEquals(self.ls._list_owned(), set())
913
    # We haven't really acquired anything, so we cannot release
914
    self.assertRaises(AssertionError, self.ls.release)
915

    
916
  def _doLockSet(self, names, shared):
917
    try:
918
      self.ls.acquire(names, shared=shared)
919
      self.done.put('DONE')
920
      self.ls.release()
921
    except errors.LockError:
922
      self.done.put('ERR')
923

    
924
  def _doAddSet(self, names):
925
    try:
926
      self.ls.add(names, acquired=1)
927
      self.done.put('DONE')
928
      self.ls.release()
929
    except errors.LockError:
930
      self.done.put('ERR')
931

    
932
  def _doRemoveSet(self, names):
933
    self.done.put(self.ls.remove(names))
934

    
935
  @_Repeat
936
  def testConcurrentSharedAcquire(self):
937
    self.ls.acquire(['one', 'two'], shared=1)
938
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
939
    self._waitThreads()
940
    self.assertEqual(self.done.get_nowait(), 'DONE')
941
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
942
    self._waitThreads()
943
    self.assertEqual(self.done.get_nowait(), 'DONE')
944
    self._addThread(target=self._doLockSet, args=('three', 1))
945
    self._waitThreads()
946
    self.assertEqual(self.done.get_nowait(), 'DONE')
947
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
948
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
949
    self.assertRaises(Queue.Empty, self.done.get_nowait)
950
    self.ls.release()
951
    self._waitThreads()
952
    self.assertEqual(self.done.get_nowait(), 'DONE')
953
    self.assertEqual(self.done.get_nowait(), 'DONE')
954

    
955
  @_Repeat
956
  def testConcurrentExclusiveAcquire(self):
957
    self.ls.acquire(['one', 'two'])
958
    self._addThread(target=self._doLockSet, args=('three', 1))
959
    self._waitThreads()
960
    self.assertEqual(self.done.get_nowait(), 'DONE')
961
    self._addThread(target=self._doLockSet, args=('three', 0))
962
    self._waitThreads()
963
    self.assertEqual(self.done.get_nowait(), 'DONE')
964
    self.assertRaises(Queue.Empty, self.done.get_nowait)
965
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
966
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
967
    self._addThread(target=self._doLockSet, args=('one', 0))
968
    self._addThread(target=self._doLockSet, args=('one', 1))
969
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
970
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
971
    self.assertRaises(Queue.Empty, self.done.get_nowait)
972
    self.ls.release()
973
    self._waitThreads()
974
    for _ in range(6):
975
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
976

    
977
  @_Repeat
978
  def testSimpleAcquireTimeoutExpiring(self):
979
    names = sorted(self.ls._names())
980
    self.assert_(len(names) >= 3)
981

    
982
    # Get name of first lock
983
    first = names[0]
984

    
985
    # Get name of last lock
986
    last = names.pop()
987

    
988
    checks = [
989
      # Block first and try to lock it again
990
      (first, first),
991

    
992
      # Block last and try to lock all locks
993
      (None, first),
994

    
995
      # Block last and try to lock it again
996
      (last, last),
997
      ]
998

    
999
    for (wanted, block) in checks:
1000
      # Lock in exclusive mode
1001
      self.assert_(self.ls.acquire(block, shared=0))
1002

    
1003
      def _AcquireOne():
1004
        # Try to get the same lock again with a timeout (should never succeed)
1005
        if self.ls.acquire(wanted, timeout=0.1, shared=0):
1006
          self.done.put("acquired")
1007
          self.ls.release()
1008
        else:
1009
          self.assert_(not self.ls._list_owned())
1010
          self.assert_(not self.ls._is_owned())
1011
          self.done.put("not acquired")
1012

    
1013
      self._addThread(target=_AcquireOne)
1014

    
1015
      # Wait for timeout in thread to expire
1016
      self._waitThreads()
1017

    
1018
      # Release exclusive lock again
1019
      self.ls.release()
1020

    
1021
      self.assertEqual(self.done.get_nowait(), "not acquired")
1022
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1023

    
1024
  @_Repeat
1025
  def testDelayedAndExpiringLockAcquire(self):
1026
    self._setUpLS()
1027
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1028

    
1029
    for expire in (False, True):
1030
      names = sorted(self.ls._names())
1031
      self.assertEqual(len(names), 8)
1032

    
1033
      lock_ev = dict([(i, threading.Event()) for i in names])
1034

    
1035
      # Lock all in exclusive mode
1036
      self.assert_(self.ls.acquire(names, shared=0))
1037

    
1038
      if expire:
1039
        # We'll wait at least 300ms per lock
1040
        lockwait = len(names) * [0.3]
1041

    
1042
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1043
        # this gives us up to 2.4s to fail.
1044
        lockall_timeout = 0.4
1045
      else:
1046
        # This should finish rather quickly
1047
        lockwait = None
1048
        lockall_timeout = len(names) * 5.0
1049

    
1050
      def _LockAll():
1051
        def acquire_notification(name):
1052
          if not expire:
1053
            self.done.put("getting %s" % name)
1054

    
1055
          # Kick next lock
1056
          lock_ev[name].set()
1057

    
1058
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1059
                           test_notify=acquire_notification):
1060
          self.done.put("got all")
1061
          self.ls.release()
1062
        else:
1063
          self.done.put("timeout on all")
1064

    
1065
        # Notify all locks
1066
        for ev in lock_ev.values():
1067
          ev.set()
1068

    
1069
      t = self._addThread(target=_LockAll)
1070

    
1071
      for idx, name in enumerate(names):
1072
        # Wait for actual acquire on this lock to start
1073
        lock_ev[name].wait(10.0)
1074

    
1075
        if expire and t.isAlive():
1076
          # Wait some time after getting the notification to make sure the lock
1077
          # acquire will expire
1078
          SafeSleep(lockwait[idx])
1079

    
1080
        self.ls.release(names=name)
1081

    
1082
      self.assert_(not self.ls._list_owned())
1083

    
1084
      self._waitThreads()
1085

    
1086
      if expire:
1087
        # Not checking which locks were actually acquired. Doing so would be
1088
        # too timing-dependant.
1089
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1090
      else:
1091
        for i in names:
1092
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1093
        self.assertEqual(self.done.get_nowait(), "got all")
1094
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1095

    
1096
  @_Repeat
1097
  def testConcurrentRemove(self):
1098
    self.ls.add('four')
1099
    self.ls.acquire(['one', 'two', 'four'])
1100
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1101
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1102
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1103
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1104
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1105
    self.ls.remove('one')
1106
    self.ls.release()
1107
    self._waitThreads()
1108
    for i in range(4):
1109
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1110
    self.ls.add(['five', 'six'], acquired=1)
1111
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1112
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1113
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1114
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1115
    self.ls.remove('five')
1116
    self.ls.release()
1117
    self._waitThreads()
1118
    for i in range(4):
1119
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1120
    self.ls.acquire(['three', 'four'])
1121
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1122
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1123
    self.ls.remove('four')
1124
    self._waitThreads()
1125
    self.assertEqual(self.done.get_nowait(), ['six'])
1126
    self._addThread(target=self._doRemoveSet, args=(['two']))
1127
    self._waitThreads()
1128
    self.assertEqual(self.done.get_nowait(), ['two'])
1129
    self.ls.release()
1130
    # reset lockset
1131
    self._setUpLS()
1132

    
1133
  @_Repeat
1134
  def testConcurrentSharedSetLock(self):
1135
    # share the set-lock...
1136
    self.ls.acquire(None, shared=1)
1137
    # ...another thread can share it too
1138
    self._addThread(target=self._doLockSet, args=(None, 1))
1139
    self._waitThreads()
1140
    self.assertEqual(self.done.get_nowait(), 'DONE')
1141
    # ...or just share some elements
1142
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1143
    self._waitThreads()
1144
    self.assertEqual(self.done.get_nowait(), 'DONE')
1145
    # ...but not add new ones or remove any
1146
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1147
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1148
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1149
    # this just releases the set-lock
1150
    self.ls.release([])
1151
    t.join(60)
1152
    self.assertEqual(self.done.get_nowait(), 'DONE')
1153
    # release the lock on the actual elements so remove() can proceed too
1154
    self.ls.release()
1155
    self._waitThreads()
1156
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1157
    # reset lockset
1158
    self._setUpLS()
1159

    
1160
  @_Repeat
1161
  def testConcurrentExclusiveSetLock(self):
1162
    # acquire the set-lock...
1163
    self.ls.acquire(None, shared=0)
1164
    # ...no one can do anything else
1165
    self._addThread(target=self._doLockSet, args=(None, 1))
1166
    self._addThread(target=self._doLockSet, args=(None, 0))
1167
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1168
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1169
    self._addThread(target=self._doAddSet, args=(['nine']))
1170
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1171
    self.ls.release()
1172
    self._waitThreads()
1173
    for _ in range(5):
1174
      self.assertEqual(self.done.get(True, 1), 'DONE')
1175
    # cleanup
1176
    self._setUpLS()
1177

    
1178
  @_Repeat
1179
  def testConcurrentSetLockAdd(self):
1180
    self.ls.acquire('one')
1181
    # Another thread wants the whole SetLock
1182
    self._addThread(target=self._doLockSet, args=(None, 0))
1183
    self._addThread(target=self._doLockSet, args=(None, 1))
1184
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1185
    self.assertRaises(AssertionError, self.ls.add, 'four')
1186
    self.ls.release()
1187
    self._waitThreads()
1188
    self.assertEqual(self.done.get_nowait(), 'DONE')
1189
    self.assertEqual(self.done.get_nowait(), 'DONE')
1190
    self.ls.acquire(None)
1191
    self._addThread(target=self._doLockSet, args=(None, 0))
1192
    self._addThread(target=self._doLockSet, args=(None, 1))
1193
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1194
    self.ls.add('four')
1195
    self.ls.add('five', acquired=1)
1196
    self.ls.add('six', acquired=1, shared=1)
1197
    self.assertEquals(self.ls._list_owned(),
1198
      set(['one', 'two', 'three', 'five', 'six']))
1199
    self.assertEquals(self.ls._is_owned(), True)
1200
    self.assertEquals(self.ls._names(),
1201
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1202
    self.ls.release()
1203
    self._waitThreads()
1204
    self.assertEqual(self.done.get_nowait(), 'DONE')
1205
    self.assertEqual(self.done.get_nowait(), 'DONE')
1206
    self._setUpLS()
1207

    
1208
  @_Repeat
1209
  def testEmptyLockSet(self):
1210
    # get the set-lock
1211
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1212
    # now empty it...
1213
    self.ls.remove(['one', 'two', 'three'])
1214
    # and adds/locks by another thread still wait
1215
    self._addThread(target=self._doAddSet, args=(['nine']))
1216
    self._addThread(target=self._doLockSet, args=(None, 1))
1217
    self._addThread(target=self._doLockSet, args=(None, 0))
1218
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1219
    self.ls.release()
1220
    self._waitThreads()
1221
    for _ in range(3):
1222
      self.assertEqual(self.done.get_nowait(), 'DONE')
1223
    # empty it again...
1224
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1225
    # now share it...
1226
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1227
    # other sharers can go, adds still wait
1228
    self._addThread(target=self._doLockSet, args=(None, 1))
1229
    self._waitThreads()
1230
    self.assertEqual(self.done.get_nowait(), 'DONE')
1231
    self._addThread(target=self._doAddSet, args=(['nine']))
1232
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1233
    self.ls.release()
1234
    self._waitThreads()
1235
    self.assertEqual(self.done.get_nowait(), 'DONE')
1236
    self._setUpLS()
1237

    
1238

    
1239
class TestGanetiLockManager(_ThreadedTestCase):
1240

    
1241
  def setUp(self):
1242
    _ThreadedTestCase.setUp(self)
1243
    self.nodes=['n1', 'n2']
1244
    self.instances=['i1', 'i2', 'i3']
1245
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1246
                                        instances=self.instances)
1247

    
1248
  def tearDown(self):
1249
    # Don't try this at home...
1250
    locking.GanetiLockManager._instance = None
1251

    
1252
  def testLockingConstants(self):
1253
    # The locking library internally cheats by assuming its constants have some
1254
    # relationships with each other. Check those hold true.
1255
    # This relationship is also used in the Processor to recursively acquire
1256
    # the right locks. Again, please don't break it.
1257
    for i in range(len(locking.LEVELS)):
1258
      self.assertEqual(i, locking.LEVELS[i])
1259

    
1260
  def testDoubleGLFails(self):
1261
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1262

    
1263
  def testLockNames(self):
1264
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1265
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1266
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1267
                     set(self.instances))
1268

    
1269
  def testInitAndResources(self):
1270
    locking.GanetiLockManager._instance = None
1271
    self.GL = locking.GanetiLockManager()
1272
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1273
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1274
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1275

    
1276
    locking.GanetiLockManager._instance = None
1277
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1278
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1279
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1280
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1281

    
1282
    locking.GanetiLockManager._instance = None
1283
    self.GL = locking.GanetiLockManager(instances=self.instances)
1284
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1285
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1286
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1287
                     set(self.instances))
1288

    
1289
  def testAcquireRelease(self):
1290
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1291
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1292
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1293
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1294
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1295
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1296
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1297
    self.GL.release(locking.LEVEL_NODE)
1298
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1299
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1300
    self.GL.release(locking.LEVEL_INSTANCE)
1301
    self.assertRaises(errors.LockError, self.GL.acquire,
1302
                      locking.LEVEL_INSTANCE, ['i5'])
1303
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1304
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1305

    
1306
  def testAcquireWholeSets(self):
1307
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1308
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1309
                      set(self.instances))
1310
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1311
                      set(self.instances))
1312
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1313
                      set(self.nodes))
1314
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1315
                      set(self.nodes))
1316
    self.GL.release(locking.LEVEL_NODE)
1317
    self.GL.release(locking.LEVEL_INSTANCE)
1318
    self.GL.release(locking.LEVEL_CLUSTER)
1319

    
1320
  def testAcquireWholeAndPartial(self):
1321
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1322
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1323
                      set(self.instances))
1324
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1325
                      set(self.instances))
1326
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1327
                      set(['n2']))
1328
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1329
                      set(['n2']))
1330
    self.GL.release(locking.LEVEL_NODE)
1331
    self.GL.release(locking.LEVEL_INSTANCE)
1332
    self.GL.release(locking.LEVEL_CLUSTER)
1333

    
1334
  def testBGLDependency(self):
1335
    self.assertRaises(AssertionError, self.GL.acquire,
1336
                      locking.LEVEL_NODE, ['n1', 'n2'])
1337
    self.assertRaises(AssertionError, self.GL.acquire,
1338
                      locking.LEVEL_INSTANCE, ['i3'])
1339
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1340
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1341
    self.assertRaises(AssertionError, self.GL.release,
1342
                      locking.LEVEL_CLUSTER, ['BGL'])
1343
    self.assertRaises(AssertionError, self.GL.release,
1344
                      locking.LEVEL_CLUSTER)
1345
    self.GL.release(locking.LEVEL_NODE)
1346
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1347
    self.assertRaises(AssertionError, self.GL.release,
1348
                      locking.LEVEL_CLUSTER, ['BGL'])
1349
    self.assertRaises(AssertionError, self.GL.release,
1350
                      locking.LEVEL_CLUSTER)
1351
    self.GL.release(locking.LEVEL_INSTANCE)
1352

    
1353
  def testWrongOrder(self):
1354
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1355
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1356
    self.assertRaises(AssertionError, self.GL.acquire,
1357
                      locking.LEVEL_NODE, ['n1'])
1358
    self.assertRaises(AssertionError, self.GL.acquire,
1359
                      locking.LEVEL_INSTANCE, ['i2'])
1360

    
1361
  # Helper function to run as a thread that shared the BGL and then acquires
1362
  # some locks at another level.
1363
  def _doLock(self, level, names, shared):
1364
    try:
1365
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1366
      self.GL.acquire(level, names, shared=shared)
1367
      self.done.put('DONE')
1368
      self.GL.release(level)
1369
      self.GL.release(locking.LEVEL_CLUSTER)
1370
    except errors.LockError:
1371
      self.done.put('ERR')
1372

    
1373
  @_Repeat
1374
  def testConcurrency(self):
1375
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1376
    self._addThread(target=self._doLock,
1377
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1378
    self._waitThreads()
1379
    self.assertEqual(self.done.get_nowait(), 'DONE')
1380
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1381
    self._addThread(target=self._doLock,
1382
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1383
    self._waitThreads()
1384
    self.assertEqual(self.done.get_nowait(), 'DONE')
1385
    self._addThread(target=self._doLock,
1386
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1387
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1388
    self.GL.release(locking.LEVEL_INSTANCE)
1389
    self._waitThreads()
1390
    self.assertEqual(self.done.get_nowait(), 'DONE')
1391
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1392
    self._addThread(target=self._doLock,
1393
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1394
    self._waitThreads()
1395
    self.assertEqual(self.done.get_nowait(), 'DONE')
1396
    self._addThread(target=self._doLock,
1397
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1398
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1399
    self.GL.release(locking.LEVEL_INSTANCE)
1400
    self._waitThreads()
1401
    self.assertEqual(self.done.get(True, 1), 'DONE')
1402
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1403

    
1404

    
1405
if __name__ == '__main__':
1406
  unittest.main()
1407
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1408
  #unittest.TextTestRunner(verbosity=2).run(suite)