Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 1b045f5d

History | View | Annotate | Download (46.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34
import testutils
35

    
36

    
37
# This is used to test the ssynchronize decorator.
38
# Since it's passed as input to a decorator it must be declared as a global.
39
_decoratorlock = locking.SharedLock()
40

    
41
#: List for looping tests
42
ITERATIONS = range(8)
43

    
44

    
45
def _Repeat(fn):
46
  """Decorator for executing a function many times"""
47
  def wrapper(*args, **kwargs):
48
    for i in ITERATIONS:
49
      fn(*args, **kwargs)
50
  return wrapper
51

    
52

    
53
def SafeSleep(duration):
54
  start = time.time()
55
  while True:
56
    delay = start + duration - time.time()
57
    if delay <= 0.0:
58
      break
59
    time.sleep(delay)
60

    
61

    
62
class _ThreadedTestCase(unittest.TestCase):
63
  """Test class that supports adding/waiting on threads"""
64
  def setUp(self):
65
    unittest.TestCase.setUp(self)
66
    self.done = Queue.Queue(0)
67
    self.threads = []
68

    
69
  def _addThread(self, *args, **kwargs):
70
    """Create and remember a new thread"""
71
    t = threading.Thread(*args, **kwargs)
72
    self.threads.append(t)
73
    t.start()
74
    return t
75

    
76
  def _waitThreads(self):
77
    """Wait for all our threads to finish"""
78
    for t in self.threads:
79
      t.join(60)
80
      self.failIf(t.isAlive())
81
    self.threads = []
82

    
83

    
84
class _ConditionTestCase(_ThreadedTestCase):
85
  """Common test case for conditions"""
86

    
87
  def setUp(self, cls):
88
    _ThreadedTestCase.setUp(self)
89
    self.lock = threading.Lock()
90
    self.cond = cls(self.lock)
91

    
92
  def _testAcquireRelease(self):
93
    self.assertFalse(self.cond._is_owned())
94
    self.assertRaises(RuntimeError, self.cond.wait)
95
    self.assertRaises(RuntimeError, self.cond.notifyAll)
96

    
97
    self.cond.acquire()
98
    self.assert_(self.cond._is_owned())
99
    self.cond.notifyAll()
100
    self.assert_(self.cond._is_owned())
101
    self.cond.release()
102

    
103
    self.assertFalse(self.cond._is_owned())
104
    self.assertRaises(RuntimeError, self.cond.wait)
105
    self.assertRaises(RuntimeError, self.cond.notifyAll)
106

    
107
  def _testNotification(self):
108
    def _NotifyAll():
109
      self.done.put("NE")
110
      self.cond.acquire()
111
      self.done.put("NA")
112
      self.cond.notifyAll()
113
      self.done.put("NN")
114
      self.cond.release()
115

    
116
    self.cond.acquire()
117
    self._addThread(target=_NotifyAll)
118
    self.assertEqual(self.done.get(True, 1), "NE")
119
    self.assertRaises(Queue.Empty, self.done.get_nowait)
120
    self.cond.wait()
121
    self.assertEqual(self.done.get(True, 1), "NA")
122
    self.assertEqual(self.done.get(True, 1), "NN")
123
    self.assert_(self.cond._is_owned())
124
    self.cond.release()
125
    self.assertFalse(self.cond._is_owned())
126

    
127

    
128
class TestSingleNotifyPipeCondition(_ConditionTestCase):
129
  """SingleNotifyPipeCondition tests"""
130

    
131
  def setUp(self):
132
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
133

    
134
  def testAcquireRelease(self):
135
    self._testAcquireRelease()
136

    
137
  def testNotification(self):
138
    self._testNotification()
139

    
140
  def testWaitReuse(self):
141
    self.cond.acquire()
142
    self.cond.wait(0)
143
    self.cond.wait(0.1)
144
    self.cond.release()
145

    
146
  def testNoNotifyReuse(self):
147
    self.cond.acquire()
148
    self.cond.notifyAll()
149
    self.assertRaises(RuntimeError, self.cond.wait)
150
    self.assertRaises(RuntimeError, self.cond.notifyAll)
151
    self.cond.release()
152

    
153

    
154
class TestPipeCondition(_ConditionTestCase):
155
  """PipeCondition tests"""
156

    
157
  def setUp(self):
158
    _ConditionTestCase.setUp(self, locking.PipeCondition)
159

    
160
  def testAcquireRelease(self):
161
    self._testAcquireRelease()
162

    
163
  def testNotification(self):
164
    self._testNotification()
165

    
166
  def _TestWait(self, fn):
167
    self._addThread(target=fn)
168
    self._addThread(target=fn)
169
    self._addThread(target=fn)
170

    
171
    # Wait for threads to be waiting
172
    self.assertEqual(self.done.get(True, 1), "A")
173
    self.assertEqual(self.done.get(True, 1), "A")
174
    self.assertEqual(self.done.get(True, 1), "A")
175

    
176
    self.assertRaises(Queue.Empty, self.done.get_nowait)
177

    
178
    self.cond.acquire()
179
    self.assertEqual(self.cond._nwaiters, 3)
180
    # This new thread can"t acquire the lock, and thus call wait, before we
181
    # release it
182
    self._addThread(target=fn)
183
    self.cond.notifyAll()
184
    self.assertRaises(Queue.Empty, self.done.get_nowait)
185
    self.cond.release()
186

    
187
    # We should now get 3 W and 1 A (for the new thread) in whatever order
188
    w = 0
189
    a = 0
190
    for i in range(4):
191
      got = self.done.get(True, 1)
192
      if got == "W":
193
        w += 1
194
      elif got == "A":
195
        a += 1
196
      else:
197
        self.fail("Got %s on the done queue" % got)
198

    
199
    self.assertEqual(w, 3)
200
    self.assertEqual(a, 1)
201

    
202
    self.cond.acquire()
203
    self.cond.notifyAll()
204
    self.cond.release()
205
    self._waitThreads()
206
    self.assertEqual(self.done.get_nowait(), "W")
207
    self.assertRaises(Queue.Empty, self.done.get_nowait)
208

    
209
  def testBlockingWait(self):
210
    def _BlockingWait():
211
      self.cond.acquire()
212
      self.done.put("A")
213
      self.cond.wait()
214
      self.cond.release()
215
      self.done.put("W")
216

    
217
    self._TestWait(_BlockingWait)
218

    
219
  def testLongTimeoutWait(self):
220
    def _Helper():
221
      self.cond.acquire()
222
      self.done.put("A")
223
      self.cond.wait(15.0)
224
      self.cond.release()
225
      self.done.put("W")
226

    
227
    self._TestWait(_Helper)
228

    
229
  def _TimeoutWait(self, timeout, check):
230
    self.cond.acquire()
231
    self.cond.wait(timeout)
232
    self.cond.release()
233
    self.done.put(check)
234

    
235
  def testShortTimeoutWait(self):
236
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
237
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
238
    self._waitThreads()
239
    self.assertEqual(self.done.get_nowait(), "T1")
240
    self.assertEqual(self.done.get_nowait(), "T1")
241
    self.assertRaises(Queue.Empty, self.done.get_nowait)
242

    
243
  def testZeroTimeoutWait(self):
244
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
245
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
246
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
247
    self._waitThreads()
248
    self.assertEqual(self.done.get_nowait(), "T0")
249
    self.assertEqual(self.done.get_nowait(), "T0")
250
    self.assertEqual(self.done.get_nowait(), "T0")
251
    self.assertRaises(Queue.Empty, self.done.get_nowait)
252

    
253

    
254
class TestSharedLock(_ThreadedTestCase):
255
  """SharedLock tests"""
256

    
257
  def setUp(self):
258
    _ThreadedTestCase.setUp(self)
259
    self.sl = locking.SharedLock()
260

    
261
  def testSequenceAndOwnership(self):
262
    self.assertFalse(self.sl._is_owned())
263
    self.sl.acquire(shared=1)
264
    self.assert_(self.sl._is_owned())
265
    self.assert_(self.sl._is_owned(shared=1))
266
    self.assertFalse(self.sl._is_owned(shared=0))
267
    self.sl.release()
268
    self.assertFalse(self.sl._is_owned())
269
    self.sl.acquire()
270
    self.assert_(self.sl._is_owned())
271
    self.assertFalse(self.sl._is_owned(shared=1))
272
    self.assert_(self.sl._is_owned(shared=0))
273
    self.sl.release()
274
    self.assertFalse(self.sl._is_owned())
275
    self.sl.acquire(shared=1)
276
    self.assert_(self.sl._is_owned())
277
    self.assert_(self.sl._is_owned(shared=1))
278
    self.assertFalse(self.sl._is_owned(shared=0))
279
    self.sl.release()
280
    self.assertFalse(self.sl._is_owned())
281

    
282
  def testBooleanValue(self):
283
    # semaphores are supposed to return a true value on a successful acquire
284
    self.assert_(self.sl.acquire(shared=1))
285
    self.sl.release()
286
    self.assert_(self.sl.acquire())
287
    self.sl.release()
288

    
289
  def testDoubleLockingStoE(self):
290
    self.sl.acquire(shared=1)
291
    self.assertRaises(AssertionError, self.sl.acquire)
292

    
293
  def testDoubleLockingEtoS(self):
294
    self.sl.acquire()
295
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
296

    
297
  def testDoubleLockingStoS(self):
298
    self.sl.acquire(shared=1)
299
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
300

    
301
  def testDoubleLockingEtoE(self):
302
    self.sl.acquire()
303
    self.assertRaises(AssertionError, self.sl.acquire)
304

    
305
  # helper functions: called in a separate thread they acquire the lock, send
306
  # their identifier on the done queue, then release it.
307
  def _doItSharer(self):
308
    try:
309
      self.sl.acquire(shared=1)
310
      self.done.put('SHR')
311
      self.sl.release()
312
    except errors.LockError:
313
      self.done.put('ERR')
314

    
315
  def _doItExclusive(self):
316
    try:
317
      self.sl.acquire()
318
      self.done.put('EXC')
319
      self.sl.release()
320
    except errors.LockError:
321
      self.done.put('ERR')
322

    
323
  def _doItDelete(self):
324
    try:
325
      self.sl.delete()
326
      self.done.put('DEL')
327
    except errors.LockError:
328
      self.done.put('ERR')
329

    
330
  def testSharersCanCoexist(self):
331
    self.sl.acquire(shared=1)
332
    threading.Thread(target=self._doItSharer).start()
333
    self.assert_(self.done.get(True, 1))
334
    self.sl.release()
335

    
336
  @_Repeat
337
  def testExclusiveBlocksExclusive(self):
338
    self.sl.acquire()
339
    self._addThread(target=self._doItExclusive)
340
    self.assertRaises(Queue.Empty, self.done.get_nowait)
341
    self.sl.release()
342
    self._waitThreads()
343
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
344

    
345
  @_Repeat
346
  def testExclusiveBlocksDelete(self):
347
    self.sl.acquire()
348
    self._addThread(target=self._doItDelete)
349
    self.assertRaises(Queue.Empty, self.done.get_nowait)
350
    self.sl.release()
351
    self._waitThreads()
352
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
353
    self.sl = locking.SharedLock()
354

    
355
  @_Repeat
356
  def testExclusiveBlocksSharer(self):
357
    self.sl.acquire()
358
    self._addThread(target=self._doItSharer)
359
    self.assertRaises(Queue.Empty, self.done.get_nowait)
360
    self.sl.release()
361
    self._waitThreads()
362
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
363

    
364
  @_Repeat
365
  def testSharerBlocksExclusive(self):
366
    self.sl.acquire(shared=1)
367
    self._addThread(target=self._doItExclusive)
368
    self.assertRaises(Queue.Empty, self.done.get_nowait)
369
    self.sl.release()
370
    self._waitThreads()
371
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
372

    
373
  @_Repeat
374
  def testSharerBlocksDelete(self):
375
    self.sl.acquire(shared=1)
376
    self._addThread(target=self._doItDelete)
377
    self.assertRaises(Queue.Empty, self.done.get_nowait)
378
    self.sl.release()
379
    self._waitThreads()
380
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
381
    self.sl = locking.SharedLock()
382

    
383
  @_Repeat
384
  def testWaitingExclusiveBlocksSharer(self):
385
    """SKIPPED testWaitingExclusiveBlockSharer"""
386
    return
387

    
388
    self.sl.acquire(shared=1)
389
    # the lock is acquired in shared mode...
390
    self._addThread(target=self._doItExclusive)
391
    # ...but now an exclusive is waiting...
392
    self._addThread(target=self._doItSharer)
393
    # ...so the sharer should be blocked as well
394
    self.assertRaises(Queue.Empty, self.done.get_nowait)
395
    self.sl.release()
396
    self._waitThreads()
397
    # The exclusive passed before
398
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
399
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
400

    
401
  @_Repeat
402
  def testWaitingSharerBlocksExclusive(self):
403
    """SKIPPED testWaitingSharerBlocksExclusive"""
404
    return
405

    
406
    self.sl.acquire()
407
    # the lock is acquired in exclusive mode...
408
    self._addThread(target=self._doItSharer)
409
    # ...but now a sharer is waiting...
410
    self._addThread(target=self._doItExclusive)
411
    # ...the exclusive is waiting too...
412
    self.assertRaises(Queue.Empty, self.done.get_nowait)
413
    self.sl.release()
414
    self._waitThreads()
415
    # The sharer passed before
416
    self.assertEqual(self.done.get_nowait(), 'SHR')
417
    self.assertEqual(self.done.get_nowait(), 'EXC')
418

    
419
  def testDelete(self):
420
    self.sl.delete()
421
    self.assertRaises(errors.LockError, self.sl.acquire)
422
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
423
    self.assertRaises(errors.LockError, self.sl.delete)
424

    
425
  def testDeleteTimeout(self):
426
    self.sl.delete(timeout=60)
427

    
428
  def testNoDeleteIfSharer(self):
429
    self.sl.acquire(shared=1)
430
    self.assertRaises(AssertionError, self.sl.delete)
431

    
432
  @_Repeat
433
  def testDeletePendingSharersExclusiveDelete(self):
434
    self.sl.acquire()
435
    self._addThread(target=self._doItSharer)
436
    self._addThread(target=self._doItSharer)
437
    self._addThread(target=self._doItExclusive)
438
    self._addThread(target=self._doItDelete)
439
    self.sl.delete()
440
    self._waitThreads()
441
    # The threads who were pending return ERR
442
    for _ in range(4):
443
      self.assertEqual(self.done.get_nowait(), 'ERR')
444
    self.sl = locking.SharedLock()
445

    
446
  @_Repeat
447
  def testDeletePendingDeleteExclusiveSharers(self):
448
    self.sl.acquire()
449
    self._addThread(target=self._doItDelete)
450
    self._addThread(target=self._doItExclusive)
451
    self._addThread(target=self._doItSharer)
452
    self._addThread(target=self._doItSharer)
453
    self.sl.delete()
454
    self._waitThreads()
455
    # The two threads who were pending return both ERR
456
    self.assertEqual(self.done.get_nowait(), 'ERR')
457
    self.assertEqual(self.done.get_nowait(), 'ERR')
458
    self.assertEqual(self.done.get_nowait(), 'ERR')
459
    self.assertEqual(self.done.get_nowait(), 'ERR')
460
    self.sl = locking.SharedLock()
461

    
462
  @_Repeat
463
  def testExclusiveAcquireTimeout(self):
464
    for shared in [0, 1]:
465
      on_queue = threading.Event()
466
      release_exclusive = threading.Event()
467

    
468
      def _LockExclusive():
469
        self.sl.acquire(shared=0, test_notify=on_queue.set)
470
        self.done.put("A: start wait")
471
        release_exclusive.wait()
472
        self.done.put("A: end wait")
473
        self.sl.release()
474

    
475
      # Start thread to hold lock in exclusive mode
476
      self._addThread(target=_LockExclusive)
477

    
478
      # Wait for wait to begin
479
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
480

    
481
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
482
      # on the queue
483
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
484
                                      test_notify=release_exclusive.set))
485

    
486
      self.done.put("got 2nd")
487
      self.sl.release()
488

    
489
      self._waitThreads()
490

    
491
      self.assertEqual(self.done.get_nowait(), "A: end wait")
492
      self.assertEqual(self.done.get_nowait(), "got 2nd")
493
      self.assertRaises(Queue.Empty, self.done.get_nowait)
494

    
495
  @_Repeat
496
  def testAcquireExpiringTimeout(self):
497
    def _AcquireWithTimeout(shared, timeout):
498
      if not self.sl.acquire(shared=shared, timeout=timeout):
499
        self.done.put("timeout")
500

    
501
    for shared in [0, 1]:
502
      # Lock exclusively
503
      self.sl.acquire()
504

    
505
      # Start shared acquires with timeout between 0 and 20 ms
506
      for i in range(11):
507
        self._addThread(target=_AcquireWithTimeout,
508
                        args=(shared, i * 2.0 / 1000.0))
509

    
510
      # Wait for threads to finish (makes sure the acquire timeout expires
511
      # before releasing the lock)
512
      self._waitThreads()
513

    
514
      # Release lock
515
      self.sl.release()
516

    
517
      for _ in range(11):
518
        self.assertEqual(self.done.get_nowait(), "timeout")
519

    
520
      self.assertRaises(Queue.Empty, self.done.get_nowait)
521

    
522
  @_Repeat
523
  def testSharedSkipExclusiveAcquires(self):
524
    # Tests whether shared acquires jump in front of exclusive acquires in the
525
    # queue.
526

    
527
    def _Acquire(shared, name, notify_ev, wait_ev):
528
      if notify_ev:
529
        notify_fn = notify_ev.set
530
      else:
531
        notify_fn = None
532

    
533
      if wait_ev:
534
        wait_ev.wait()
535

    
536
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
537
        return
538

    
539
      self.done.put(name)
540
      self.sl.release()
541

    
542
    # Get exclusive lock while we fill the queue
543
    self.sl.acquire()
544

    
545
    shrcnt1 = 5
546
    shrcnt2 = 7
547
    shrcnt3 = 9
548
    shrcnt4 = 2
549

    
550
    # Add acquires using threading.Event for synchronization. They'll be
551
    # acquired exactly in the order defined in this list.
552
    acquires = (shrcnt1 * [(1, "shared 1")] +
553
                3 * [(0, "exclusive 1")] +
554
                shrcnt2 * [(1, "shared 2")] +
555
                shrcnt3 * [(1, "shared 3")] +
556
                shrcnt4 * [(1, "shared 4")] +
557
                3 * [(0, "exclusive 2")])
558

    
559
    ev_cur = None
560
    ev_prev = None
561

    
562
    for args in acquires:
563
      ev_cur = threading.Event()
564
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
565
      ev_prev = ev_cur
566

    
567
    # Wait for last acquire to start
568
    ev_prev.wait()
569

    
570
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
571
    # together
572
    self.assertEqual(self.sl._count_pending(), 7)
573

    
574
    # Release exclusive lock and wait
575
    self.sl.release()
576

    
577
    self._waitThreads()
578

    
579
    # Check sequence
580
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
581
      # Shared locks aren't guaranteed to be notified in order, but they'll be
582
      # first
583
      tmp = self.done.get_nowait()
584
      if tmp == "shared 1":
585
        shrcnt1 -= 1
586
      elif tmp == "shared 2":
587
        shrcnt2 -= 1
588
      elif tmp == "shared 3":
589
        shrcnt3 -= 1
590
      elif tmp == "shared 4":
591
        shrcnt4 -= 1
592
    self.assertEqual(shrcnt1, 0)
593
    self.assertEqual(shrcnt2, 0)
594
    self.assertEqual(shrcnt3, 0)
595
    self.assertEqual(shrcnt3, 0)
596

    
597
    for _ in range(3):
598
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
599

    
600
    for _ in range(3):
601
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
602

    
603
    self.assertRaises(Queue.Empty, self.done.get_nowait)
604

    
605
  @_Repeat
606
  def testMixedAcquireTimeout(self):
607
    sync = threading.Event()
608

    
609
    def _AcquireShared(ev):
610
      if not self.sl.acquire(shared=1, timeout=None):
611
        return
612

    
613
      self.done.put("shared")
614

    
615
      # Notify main thread
616
      ev.set()
617

    
618
      # Wait for notification from main thread
619
      sync.wait()
620

    
621
      # Release lock
622
      self.sl.release()
623

    
624
    acquires = []
625
    for _ in range(3):
626
      ev = threading.Event()
627
      self._addThread(target=_AcquireShared, args=(ev, ))
628
      acquires.append(ev)
629

    
630
    # Wait for all acquires to finish
631
    for i in acquires:
632
      i.wait()
633

    
634
    self.assertEqual(self.sl._count_pending(), 0)
635

    
636
    # Try to get exclusive lock
637
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
638

    
639
    # Acquire exclusive without timeout
640
    exclsync = threading.Event()
641
    exclev = threading.Event()
642

    
643
    def _AcquireExclusive():
644
      if not self.sl.acquire(shared=0):
645
        return
646

    
647
      self.done.put("exclusive")
648

    
649
      # Notify main thread
650
      exclev.set()
651

    
652
      # Wait for notification from main thread
653
      exclsync.wait()
654

    
655
      self.sl.release()
656

    
657
    self._addThread(target=_AcquireExclusive)
658

    
659
    # Try to get exclusive lock
660
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
661

    
662
    # Make all shared holders release their locks
663
    sync.set()
664

    
665
    # Wait for exclusive acquire to succeed
666
    exclev.wait()
667

    
668
    self.assertEqual(self.sl._count_pending(), 0)
669

    
670
    # Try to get exclusive lock
671
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
672

    
673
    def _AcquireSharedSimple():
674
      if self.sl.acquire(shared=1, timeout=None):
675
        self.done.put("shared2")
676
        self.sl.release()
677

    
678
    for _ in range(10):
679
      self._addThread(target=_AcquireSharedSimple)
680

    
681
    # Tell exclusive lock to release
682
    exclsync.set()
683

    
684
    # Wait for everything to finish
685
    self._waitThreads()
686

    
687
    self.assertEqual(self.sl._count_pending(), 0)
688

    
689
    # Check sequence
690
    for _ in range(3):
691
      self.assertEqual(self.done.get_nowait(), "shared")
692

    
693
    self.assertEqual(self.done.get_nowait(), "exclusive")
694

    
695
    for _ in range(10):
696
      self.assertEqual(self.done.get_nowait(), "shared2")
697

    
698
    self.assertRaises(Queue.Empty, self.done.get_nowait)
699

    
700

    
701
class TestSharedLockInCondition(_ThreadedTestCase):
702
  """SharedLock as a condition lock tests"""
703

    
704
  def setUp(self):
705
    _ThreadedTestCase.setUp(self)
706
    self.sl = locking.SharedLock()
707
    self.setCondition()
708

    
709
  def setCondition(self):
710
    self.cond = threading.Condition(self.sl)
711

    
712
  def testKeepMode(self):
713
    self.cond.acquire(shared=1)
714
    self.assert_(self.sl._is_owned(shared=1))
715
    self.cond.wait(0)
716
    self.assert_(self.sl._is_owned(shared=1))
717
    self.cond.release()
718
    self.cond.acquire(shared=0)
719
    self.assert_(self.sl._is_owned(shared=0))
720
    self.cond.wait(0)
721
    self.assert_(self.sl._is_owned(shared=0))
722
    self.cond.release()
723

    
724

    
725
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
726
  """SharedLock as a pipe condition lock tests"""
727

    
728
  def setCondition(self):
729
    self.cond = locking.PipeCondition(self.sl)
730

    
731

    
732
class TestSSynchronizedDecorator(_ThreadedTestCase):
733
  """Shared Lock Synchronized decorator test"""
734

    
735
  def setUp(self):
736
    _ThreadedTestCase.setUp(self)
737

    
738
  @locking.ssynchronized(_decoratorlock)
739
  def _doItExclusive(self):
740
    self.assert_(_decoratorlock._is_owned())
741
    self.done.put('EXC')
742

    
743
  @locking.ssynchronized(_decoratorlock, shared=1)
744
  def _doItSharer(self):
745
    self.assert_(_decoratorlock._is_owned(shared=1))
746
    self.done.put('SHR')
747

    
748
  def testDecoratedFunctions(self):
749
    self._doItExclusive()
750
    self.assertFalse(_decoratorlock._is_owned())
751
    self._doItSharer()
752
    self.assertFalse(_decoratorlock._is_owned())
753

    
754
  def testSharersCanCoexist(self):
755
    _decoratorlock.acquire(shared=1)
756
    threading.Thread(target=self._doItSharer).start()
757
    self.assert_(self.done.get(True, 1))
758
    _decoratorlock.release()
759

    
760
  @_Repeat
761
  def testExclusiveBlocksExclusive(self):
762
    _decoratorlock.acquire()
763
    self._addThread(target=self._doItExclusive)
764
    # give it a bit of time to check that it's not actually doing anything
765
    self.assertRaises(Queue.Empty, self.done.get_nowait)
766
    _decoratorlock.release()
767
    self._waitThreads()
768
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
769

    
770
  @_Repeat
771
  def testExclusiveBlocksSharer(self):
772
    _decoratorlock.acquire()
773
    self._addThread(target=self._doItSharer)
774
    self.assertRaises(Queue.Empty, self.done.get_nowait)
775
    _decoratorlock.release()
776
    self._waitThreads()
777
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
778

    
779
  @_Repeat
780
  def testSharerBlocksExclusive(self):
781
    _decoratorlock.acquire(shared=1)
782
    self._addThread(target=self._doItExclusive)
783
    self.assertRaises(Queue.Empty, self.done.get_nowait)
784
    _decoratorlock.release()
785
    self._waitThreads()
786
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
787

    
788

    
789
class TestLockSet(_ThreadedTestCase):
790
  """LockSet tests"""
791

    
792
  def setUp(self):
793
    _ThreadedTestCase.setUp(self)
794
    self._setUpLS()
795

    
796
  def _setUpLS(self):
797
    """Helper to (re)initialize the lock set"""
798
    self.resources = ['one', 'two', 'three']
799
    self.ls = locking.LockSet(members=self.resources)
800

    
801
  def testResources(self):
802
    self.assertEquals(self.ls._names(), set(self.resources))
803
    newls = locking.LockSet()
804
    self.assertEquals(newls._names(), set())
805

    
806
  def testAcquireRelease(self):
807
    self.assert_(self.ls.acquire('one'))
808
    self.assertEquals(self.ls._list_owned(), set(['one']))
809
    self.ls.release()
810
    self.assertEquals(self.ls._list_owned(), set())
811
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
812
    self.assertEquals(self.ls._list_owned(), set(['one']))
813
    self.ls.release()
814
    self.assertEquals(self.ls._list_owned(), set())
815
    self.ls.acquire(['one', 'two', 'three'])
816
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
817
    self.ls.release('one')
818
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
819
    self.ls.release(['three'])
820
    self.assertEquals(self.ls._list_owned(), set(['two']))
821
    self.ls.release()
822
    self.assertEquals(self.ls._list_owned(), set())
823
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
824
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
825
    self.ls.release()
826
    self.assertEquals(self.ls._list_owned(), set())
827

    
828
  def testNoDoubleAcquire(self):
829
    self.ls.acquire('one')
830
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
831
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
832
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
833
    self.ls.release()
834
    self.ls.acquire(['one', 'three'])
835
    self.ls.release('one')
836
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
837
    self.ls.release('three')
838

    
839
  def testNoWrongRelease(self):
840
    self.assertRaises(AssertionError, self.ls.release)
841
    self.ls.acquire('one')
842
    self.assertRaises(AssertionError, self.ls.release, 'two')
843

    
844
  def testAddRemove(self):
845
    self.ls.add('four')
846
    self.assertEquals(self.ls._list_owned(), set())
847
    self.assert_('four' in self.ls._names())
848
    self.ls.add(['five', 'six', 'seven'], acquired=1)
849
    self.assert_('five' in self.ls._names())
850
    self.assert_('six' in self.ls._names())
851
    self.assert_('seven' in self.ls._names())
852
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
853
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
854
    self.assert_('five' not in self.ls._names())
855
    self.assert_('six' not in self.ls._names())
856
    self.assertEquals(self.ls._list_owned(), set(['seven']))
857
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
858
    self.ls.remove('seven')
859
    self.assert_('seven' not in self.ls._names())
860
    self.assertEquals(self.ls._list_owned(), set([]))
861
    self.ls.acquire(None, shared=1)
862
    self.assertRaises(AssertionError, self.ls.add, 'eight')
863
    self.ls.release()
864
    self.ls.acquire(None)
865
    self.ls.add('eight', acquired=1)
866
    self.assert_('eight' in self.ls._names())
867
    self.assert_('eight' in self.ls._list_owned())
868
    self.ls.add('nine')
869
    self.assert_('nine' in self.ls._names())
870
    self.assert_('nine' not in self.ls._list_owned())
871
    self.ls.release()
872
    self.ls.remove(['two'])
873
    self.assert_('two' not in self.ls._names())
874
    self.ls.acquire('three')
875
    self.assertEquals(self.ls.remove(['three']), ['three'])
876
    self.assert_('three' not in self.ls._names())
877
    self.assertEquals(self.ls.remove('three'), [])
878
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
879
    self.assert_('one' not in self.ls._names())
880

    
881
  def testRemoveNonBlocking(self):
882
    self.ls.acquire('one')
883
    self.assertEquals(self.ls.remove('one'), ['one'])
884
    self.ls.acquire(['two', 'three'])
885
    self.assertEquals(self.ls.remove(['two', 'three']),
886
                      ['two', 'three'])
887

    
888
  def testNoDoubleAdd(self):
889
    self.assertRaises(errors.LockError, self.ls.add, 'two')
890
    self.ls.add('four')
891
    self.assertRaises(errors.LockError, self.ls.add, 'four')
892

    
893
  def testNoWrongRemoves(self):
894
    self.ls.acquire(['one', 'three'], shared=1)
895
    # Cannot remove 'two' while holding something which is not a superset
896
    self.assertRaises(AssertionError, self.ls.remove, 'two')
897
    # Cannot remove 'three' as we are sharing it
898
    self.assertRaises(AssertionError, self.ls.remove, 'three')
899

    
900
  def testAcquireSetLock(self):
901
    # acquire the set-lock exclusively
902
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
903
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
904
    self.assertEquals(self.ls._is_owned(), True)
905
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
906
    # I can still add/remove elements...
907
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
908
    self.assert_(self.ls.add('six'))
909
    self.ls.release()
910
    # share the set-lock
911
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
912
    # adding new elements is not possible
913
    self.assertRaises(AssertionError, self.ls.add, 'five')
914
    self.ls.release()
915

    
916
  def testAcquireWithRepetitions(self):
917
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
918
                      set(['two', 'two', 'three']))
919
    self.ls.release(['two', 'two'])
920
    self.assertEquals(self.ls._list_owned(), set(['three']))
921

    
922
  def testEmptyAcquire(self):
923
    # Acquire an empty list of locks...
924
    self.assertEquals(self.ls.acquire([]), set())
925
    self.assertEquals(self.ls._list_owned(), set())
926
    # New locks can still be addded
927
    self.assert_(self.ls.add('six'))
928
    # "re-acquiring" is not an issue, since we had really acquired nothing
929
    self.assertEquals(self.ls.acquire([], shared=1), set())
930
    self.assertEquals(self.ls._list_owned(), set())
931
    # We haven't really acquired anything, so we cannot release
932
    self.assertRaises(AssertionError, self.ls.release)
933

    
934
  def _doLockSet(self, names, shared):
935
    try:
936
      self.ls.acquire(names, shared=shared)
937
      self.done.put('DONE')
938
      self.ls.release()
939
    except errors.LockError:
940
      self.done.put('ERR')
941

    
942
  def _doAddSet(self, names):
943
    try:
944
      self.ls.add(names, acquired=1)
945
      self.done.put('DONE')
946
      self.ls.release()
947
    except errors.LockError:
948
      self.done.put('ERR')
949

    
950
  def _doRemoveSet(self, names):
951
    self.done.put(self.ls.remove(names))
952

    
953
  @_Repeat
954
  def testConcurrentSharedAcquire(self):
955
    self.ls.acquire(['one', 'two'], shared=1)
956
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
957
    self._waitThreads()
958
    self.assertEqual(self.done.get_nowait(), 'DONE')
959
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
960
    self._waitThreads()
961
    self.assertEqual(self.done.get_nowait(), 'DONE')
962
    self._addThread(target=self._doLockSet, args=('three', 1))
963
    self._waitThreads()
964
    self.assertEqual(self.done.get_nowait(), 'DONE')
965
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
966
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
967
    self.assertRaises(Queue.Empty, self.done.get_nowait)
968
    self.ls.release()
969
    self._waitThreads()
970
    self.assertEqual(self.done.get_nowait(), 'DONE')
971
    self.assertEqual(self.done.get_nowait(), 'DONE')
972

    
973
  @_Repeat
974
  def testConcurrentExclusiveAcquire(self):
975
    self.ls.acquire(['one', 'two'])
976
    self._addThread(target=self._doLockSet, args=('three', 1))
977
    self._waitThreads()
978
    self.assertEqual(self.done.get_nowait(), 'DONE')
979
    self._addThread(target=self._doLockSet, args=('three', 0))
980
    self._waitThreads()
981
    self.assertEqual(self.done.get_nowait(), 'DONE')
982
    self.assertRaises(Queue.Empty, self.done.get_nowait)
983
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
984
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
985
    self._addThread(target=self._doLockSet, args=('one', 0))
986
    self._addThread(target=self._doLockSet, args=('one', 1))
987
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
988
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
989
    self.assertRaises(Queue.Empty, self.done.get_nowait)
990
    self.ls.release()
991
    self._waitThreads()
992
    for _ in range(6):
993
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
994

    
995
  @_Repeat
996
  def testSimpleAcquireTimeoutExpiring(self):
997
    names = sorted(self.ls._names())
998
    self.assert_(len(names) >= 3)
999

    
1000
    # Get name of first lock
1001
    first = names[0]
1002

    
1003
    # Get name of last lock
1004
    last = names.pop()
1005

    
1006
    checks = [
1007
      # Block first and try to lock it again
1008
      (first, first),
1009

    
1010
      # Block last and try to lock all locks
1011
      (None, first),
1012

    
1013
      # Block last and try to lock it again
1014
      (last, last),
1015
      ]
1016

    
1017
    for (wanted, block) in checks:
1018
      # Lock in exclusive mode
1019
      self.assert_(self.ls.acquire(block, shared=0))
1020

    
1021
      def _AcquireOne():
1022
        # Try to get the same lock again with a timeout (should never succeed)
1023
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1024
        if acquired:
1025
          self.done.put("acquired")
1026
          self.ls.release()
1027
        else:
1028
          self.assert_(acquired is None)
1029
          self.assertFalse(self.ls._list_owned())
1030
          self.assertFalse(self.ls._is_owned())
1031
          self.done.put("not acquired")
1032

    
1033
      self._addThread(target=_AcquireOne)
1034

    
1035
      # Wait for timeout in thread to expire
1036
      self._waitThreads()
1037

    
1038
      # Release exclusive lock again
1039
      self.ls.release()
1040

    
1041
      self.assertEqual(self.done.get_nowait(), "not acquired")
1042
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1043

    
1044
  @_Repeat
1045
  def testDelayedAndExpiringLockAcquire(self):
1046
    self._setUpLS()
1047
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1048

    
1049
    for expire in (False, True):
1050
      names = sorted(self.ls._names())
1051
      self.assertEqual(len(names), 8)
1052

    
1053
      lock_ev = dict([(i, threading.Event()) for i in names])
1054

    
1055
      # Lock all in exclusive mode
1056
      self.assert_(self.ls.acquire(names, shared=0))
1057

    
1058
      if expire:
1059
        # We'll wait at least 300ms per lock
1060
        lockwait = len(names) * [0.3]
1061

    
1062
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1063
        # this gives us up to 2.4s to fail.
1064
        lockall_timeout = 0.4
1065
      else:
1066
        # This should finish rather quickly
1067
        lockwait = None
1068
        lockall_timeout = len(names) * 5.0
1069

    
1070
      def _LockAll():
1071
        def acquire_notification(name):
1072
          if not expire:
1073
            self.done.put("getting %s" % name)
1074

    
1075
          # Kick next lock
1076
          lock_ev[name].set()
1077

    
1078
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1079
                           test_notify=acquire_notification):
1080
          self.done.put("got all")
1081
          self.ls.release()
1082
        else:
1083
          self.done.put("timeout on all")
1084

    
1085
        # Notify all locks
1086
        for ev in lock_ev.values():
1087
          ev.set()
1088

    
1089
      t = self._addThread(target=_LockAll)
1090

    
1091
      for idx, name in enumerate(names):
1092
        # Wait for actual acquire on this lock to start
1093
        lock_ev[name].wait(10.0)
1094

    
1095
        if expire and t.isAlive():
1096
          # Wait some time after getting the notification to make sure the lock
1097
          # acquire will expire
1098
          SafeSleep(lockwait[idx])
1099

    
1100
        self.ls.release(names=name)
1101

    
1102
      self.assertFalse(self.ls._list_owned())
1103

    
1104
      self._waitThreads()
1105

    
1106
      if expire:
1107
        # Not checking which locks were actually acquired. Doing so would be
1108
        # too timing-dependant.
1109
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1110
      else:
1111
        for i in names:
1112
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1113
        self.assertEqual(self.done.get_nowait(), "got all")
1114
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1115

    
1116
  @_Repeat
1117
  def testConcurrentRemove(self):
1118
    self.ls.add('four')
1119
    self.ls.acquire(['one', 'two', 'four'])
1120
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1121
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1122
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1123
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1124
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1125
    self.ls.remove('one')
1126
    self.ls.release()
1127
    self._waitThreads()
1128
    for i in range(4):
1129
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1130
    self.ls.add(['five', 'six'], acquired=1)
1131
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1132
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1133
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1134
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1135
    self.ls.remove('five')
1136
    self.ls.release()
1137
    self._waitThreads()
1138
    for i in range(4):
1139
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1140
    self.ls.acquire(['three', 'four'])
1141
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1142
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1143
    self.ls.remove('four')
1144
    self._waitThreads()
1145
    self.assertEqual(self.done.get_nowait(), ['six'])
1146
    self._addThread(target=self._doRemoveSet, args=(['two']))
1147
    self._waitThreads()
1148
    self.assertEqual(self.done.get_nowait(), ['two'])
1149
    self.ls.release()
1150
    # reset lockset
1151
    self._setUpLS()
1152

    
1153
  @_Repeat
1154
  def testConcurrentSharedSetLock(self):
1155
    # share the set-lock...
1156
    self.ls.acquire(None, shared=1)
1157
    # ...another thread can share it too
1158
    self._addThread(target=self._doLockSet, args=(None, 1))
1159
    self._waitThreads()
1160
    self.assertEqual(self.done.get_nowait(), 'DONE')
1161
    # ...or just share some elements
1162
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1163
    self._waitThreads()
1164
    self.assertEqual(self.done.get_nowait(), 'DONE')
1165
    # ...but not add new ones or remove any
1166
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1167
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1168
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1169
    # this just releases the set-lock
1170
    self.ls.release([])
1171
    t.join(60)
1172
    self.assertEqual(self.done.get_nowait(), 'DONE')
1173
    # release the lock on the actual elements so remove() can proceed too
1174
    self.ls.release()
1175
    self._waitThreads()
1176
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1177
    # reset lockset
1178
    self._setUpLS()
1179

    
1180
  @_Repeat
1181
  def testConcurrentExclusiveSetLock(self):
1182
    # acquire the set-lock...
1183
    self.ls.acquire(None, shared=0)
1184
    # ...no one can do anything else
1185
    self._addThread(target=self._doLockSet, args=(None, 1))
1186
    self._addThread(target=self._doLockSet, args=(None, 0))
1187
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1188
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1189
    self._addThread(target=self._doAddSet, args=(['nine']))
1190
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1191
    self.ls.release()
1192
    self._waitThreads()
1193
    for _ in range(5):
1194
      self.assertEqual(self.done.get(True, 1), 'DONE')
1195
    # cleanup
1196
    self._setUpLS()
1197

    
1198
  @_Repeat
1199
  def testConcurrentSetLockAdd(self):
1200
    self.ls.acquire('one')
1201
    # Another thread wants the whole SetLock
1202
    self._addThread(target=self._doLockSet, args=(None, 0))
1203
    self._addThread(target=self._doLockSet, args=(None, 1))
1204
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1205
    self.assertRaises(AssertionError, self.ls.add, 'four')
1206
    self.ls.release()
1207
    self._waitThreads()
1208
    self.assertEqual(self.done.get_nowait(), 'DONE')
1209
    self.assertEqual(self.done.get_nowait(), 'DONE')
1210
    self.ls.acquire(None)
1211
    self._addThread(target=self._doLockSet, args=(None, 0))
1212
    self._addThread(target=self._doLockSet, args=(None, 1))
1213
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1214
    self.ls.add('four')
1215
    self.ls.add('five', acquired=1)
1216
    self.ls.add('six', acquired=1, shared=1)
1217
    self.assertEquals(self.ls._list_owned(),
1218
      set(['one', 'two', 'three', 'five', 'six']))
1219
    self.assertEquals(self.ls._is_owned(), True)
1220
    self.assertEquals(self.ls._names(),
1221
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1222
    self.ls.release()
1223
    self._waitThreads()
1224
    self.assertEqual(self.done.get_nowait(), 'DONE')
1225
    self.assertEqual(self.done.get_nowait(), 'DONE')
1226
    self._setUpLS()
1227

    
1228
  @_Repeat
1229
  def testEmptyLockSet(self):
1230
    # get the set-lock
1231
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1232
    # now empty it...
1233
    self.ls.remove(['one', 'two', 'three'])
1234
    # and adds/locks by another thread still wait
1235
    self._addThread(target=self._doAddSet, args=(['nine']))
1236
    self._addThread(target=self._doLockSet, args=(None, 1))
1237
    self._addThread(target=self._doLockSet, args=(None, 0))
1238
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1239
    self.ls.release()
1240
    self._waitThreads()
1241
    for _ in range(3):
1242
      self.assertEqual(self.done.get_nowait(), 'DONE')
1243
    # empty it again...
1244
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1245
    # now share it...
1246
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1247
    # other sharers can go, adds still wait
1248
    self._addThread(target=self._doLockSet, args=(None, 1))
1249
    self._waitThreads()
1250
    self.assertEqual(self.done.get_nowait(), 'DONE')
1251
    self._addThread(target=self._doAddSet, args=(['nine']))
1252
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1253
    self.ls.release()
1254
    self._waitThreads()
1255
    self.assertEqual(self.done.get_nowait(), 'DONE')
1256
    self._setUpLS()
1257

    
1258

    
1259
class TestGanetiLockManager(_ThreadedTestCase):
1260

    
1261
  def setUp(self):
1262
    _ThreadedTestCase.setUp(self)
1263
    self.nodes=['n1', 'n2']
1264
    self.instances=['i1', 'i2', 'i3']
1265
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1266
                                        instances=self.instances)
1267

    
1268
  def tearDown(self):
1269
    # Don't try this at home...
1270
    locking.GanetiLockManager._instance = None
1271

    
1272
  def testLockingConstants(self):
1273
    # The locking library internally cheats by assuming its constants have some
1274
    # relationships with each other. Check those hold true.
1275
    # This relationship is also used in the Processor to recursively acquire
1276
    # the right locks. Again, please don't break it.
1277
    for i in range(len(locking.LEVELS)):
1278
      self.assertEqual(i, locking.LEVELS[i])
1279

    
1280
  def testDoubleGLFails(self):
1281
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1282

    
1283
  def testLockNames(self):
1284
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1285
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1286
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1287
                     set(self.instances))
1288

    
1289
  def testInitAndResources(self):
1290
    locking.GanetiLockManager._instance = None
1291
    self.GL = locking.GanetiLockManager()
1292
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1293
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1294
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1295

    
1296
    locking.GanetiLockManager._instance = None
1297
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1298
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1299
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1300
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1301

    
1302
    locking.GanetiLockManager._instance = None
1303
    self.GL = locking.GanetiLockManager(instances=self.instances)
1304
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1305
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1306
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1307
                     set(self.instances))
1308

    
1309
  def testAcquireRelease(self):
1310
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1311
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1312
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1313
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1314
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1315
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1316
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1317
    self.GL.release(locking.LEVEL_NODE)
1318
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1319
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1320
    self.GL.release(locking.LEVEL_INSTANCE)
1321
    self.assertRaises(errors.LockError, self.GL.acquire,
1322
                      locking.LEVEL_INSTANCE, ['i5'])
1323
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1324
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1325

    
1326
  def testAcquireWholeSets(self):
1327
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1328
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1329
                      set(self.instances))
1330
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1331
                      set(self.instances))
1332
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1333
                      set(self.nodes))
1334
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1335
                      set(self.nodes))
1336
    self.GL.release(locking.LEVEL_NODE)
1337
    self.GL.release(locking.LEVEL_INSTANCE)
1338
    self.GL.release(locking.LEVEL_CLUSTER)
1339

    
1340
  def testAcquireWholeAndPartial(self):
1341
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1342
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1343
                      set(self.instances))
1344
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1345
                      set(self.instances))
1346
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1347
                      set(['n2']))
1348
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1349
                      set(['n2']))
1350
    self.GL.release(locking.LEVEL_NODE)
1351
    self.GL.release(locking.LEVEL_INSTANCE)
1352
    self.GL.release(locking.LEVEL_CLUSTER)
1353

    
1354
  def testBGLDependency(self):
1355
    self.assertRaises(AssertionError, self.GL.acquire,
1356
                      locking.LEVEL_NODE, ['n1', 'n2'])
1357
    self.assertRaises(AssertionError, self.GL.acquire,
1358
                      locking.LEVEL_INSTANCE, ['i3'])
1359
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1360
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1361
    self.assertRaises(AssertionError, self.GL.release,
1362
                      locking.LEVEL_CLUSTER, ['BGL'])
1363
    self.assertRaises(AssertionError, self.GL.release,
1364
                      locking.LEVEL_CLUSTER)
1365
    self.GL.release(locking.LEVEL_NODE)
1366
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1367
    self.assertRaises(AssertionError, self.GL.release,
1368
                      locking.LEVEL_CLUSTER, ['BGL'])
1369
    self.assertRaises(AssertionError, self.GL.release,
1370
                      locking.LEVEL_CLUSTER)
1371
    self.GL.release(locking.LEVEL_INSTANCE)
1372

    
1373
  def testWrongOrder(self):
1374
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1375
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1376
    self.assertRaises(AssertionError, self.GL.acquire,
1377
                      locking.LEVEL_NODE, ['n1'])
1378
    self.assertRaises(AssertionError, self.GL.acquire,
1379
                      locking.LEVEL_INSTANCE, ['i2'])
1380

    
1381
  # Helper function to run as a thread that shared the BGL and then acquires
1382
  # some locks at another level.
1383
  def _doLock(self, level, names, shared):
1384
    try:
1385
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1386
      self.GL.acquire(level, names, shared=shared)
1387
      self.done.put('DONE')
1388
      self.GL.release(level)
1389
      self.GL.release(locking.LEVEL_CLUSTER)
1390
    except errors.LockError:
1391
      self.done.put('ERR')
1392

    
1393
  @_Repeat
1394
  def testConcurrency(self):
1395
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1396
    self._addThread(target=self._doLock,
1397
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1398
    self._waitThreads()
1399
    self.assertEqual(self.done.get_nowait(), 'DONE')
1400
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1401
    self._addThread(target=self._doLock,
1402
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1403
    self._waitThreads()
1404
    self.assertEqual(self.done.get_nowait(), 'DONE')
1405
    self._addThread(target=self._doLock,
1406
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1407
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1408
    self.GL.release(locking.LEVEL_INSTANCE)
1409
    self._waitThreads()
1410
    self.assertEqual(self.done.get_nowait(), 'DONE')
1411
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1412
    self._addThread(target=self._doLock,
1413
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1414
    self._waitThreads()
1415
    self.assertEqual(self.done.get_nowait(), 'DONE')
1416
    self._addThread(target=self._doLock,
1417
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1418
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1419
    self.GL.release(locking.LEVEL_INSTANCE)
1420
    self._waitThreads()
1421
    self.assertEqual(self.done.get(True, 1), 'DONE')
1422
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1423

    
1424

    
1425
if __name__ == '__main__':
1426
  testutils.GanetiTestProgram()