Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 19b9ba9a

History | View | Annotate | Download (51.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31

    
32
from ganeti import locking
33
from ganeti import errors
34
from ganeti import utils
35

    
36
import testutils
37

    
38

    
39
# This is used to test the ssynchronize decorator.
40
# Since it's passed as input to a decorator it must be declared as a global.
41
_decoratorlock = locking.SharedLock("decorator lock")
42

    
43
#: List for looping tests
44
ITERATIONS = range(8)
45

    
46

    
47
def _Repeat(fn):
48
  """Decorator for executing a function many times"""
49
  def wrapper(*args, **kwargs):
50
    for i in ITERATIONS:
51
      fn(*args, **kwargs)
52
  return wrapper
53

    
54

    
55
def SafeSleep(duration):
56
  start = time.time()
57
  while True:
58
    delay = start + duration - time.time()
59
    if delay <= 0.0:
60
      break
61
    time.sleep(delay)
62

    
63

    
64
class _ThreadedTestCase(unittest.TestCase):
65
  """Test class that supports adding/waiting on threads"""
66
  def setUp(self):
67
    unittest.TestCase.setUp(self)
68
    self.done = Queue.Queue(0)
69
    self.threads = []
70

    
71
  def _addThread(self, *args, **kwargs):
72
    """Create and remember a new thread"""
73
    t = threading.Thread(*args, **kwargs)
74
    self.threads.append(t)
75
    t.start()
76
    return t
77

    
78
  def _waitThreads(self):
79
    """Wait for all our threads to finish"""
80
    for t in self.threads:
81
      t.join(60)
82
      self.failIf(t.isAlive())
83
    self.threads = []
84

    
85

    
86
class _ConditionTestCase(_ThreadedTestCase):
87
  """Common test case for conditions"""
88

    
89
  def setUp(self, cls):
90
    _ThreadedTestCase.setUp(self)
91
    self.lock = threading.Lock()
92
    self.cond = cls(self.lock)
93

    
94
  def _testAcquireRelease(self):
95
    self.assertFalse(self.cond._is_owned())
96
    self.assertRaises(RuntimeError, self.cond.wait)
97
    self.assertRaises(RuntimeError, self.cond.notifyAll)
98

    
99
    self.cond.acquire()
100
    self.assert_(self.cond._is_owned())
101
    self.cond.notifyAll()
102
    self.assert_(self.cond._is_owned())
103
    self.cond.release()
104

    
105
    self.assertFalse(self.cond._is_owned())
106
    self.assertRaises(RuntimeError, self.cond.wait)
107
    self.assertRaises(RuntimeError, self.cond.notifyAll)
108

    
109
  def _testNotification(self):
110
    def _NotifyAll():
111
      self.done.put("NE")
112
      self.cond.acquire()
113
      self.done.put("NA")
114
      self.cond.notifyAll()
115
      self.done.put("NN")
116
      self.cond.release()
117

    
118
    self.cond.acquire()
119
    self._addThread(target=_NotifyAll)
120
    self.assertEqual(self.done.get(True, 1), "NE")
121
    self.assertRaises(Queue.Empty, self.done.get_nowait)
122
    self.cond.wait()
123
    self.assertEqual(self.done.get(True, 1), "NA")
124
    self.assertEqual(self.done.get(True, 1), "NN")
125
    self.assert_(self.cond._is_owned())
126
    self.cond.release()
127
    self.assertFalse(self.cond._is_owned())
128

    
129

    
130
class TestSingleNotifyPipeCondition(_ConditionTestCase):
131
  """SingleNotifyPipeCondition tests"""
132

    
133
  def setUp(self):
134
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
135

    
136
  def testAcquireRelease(self):
137
    self._testAcquireRelease()
138

    
139
  def testNotification(self):
140
    self._testNotification()
141

    
142
  def testWaitReuse(self):
143
    self.cond.acquire()
144
    self.cond.wait(0)
145
    self.cond.wait(0.1)
146
    self.cond.release()
147

    
148
  def testNoNotifyReuse(self):
149
    self.cond.acquire()
150
    self.cond.notifyAll()
151
    self.assertRaises(RuntimeError, self.cond.wait)
152
    self.assertRaises(RuntimeError, self.cond.notifyAll)
153
    self.cond.release()
154

    
155

    
156
class TestPipeCondition(_ConditionTestCase):
157
  """PipeCondition tests"""
158

    
159
  def setUp(self):
160
    _ConditionTestCase.setUp(self, locking.PipeCondition)
161

    
162
  def testAcquireRelease(self):
163
    self._testAcquireRelease()
164

    
165
  def testNotification(self):
166
    self._testNotification()
167

    
168
  def _TestWait(self, fn):
169
    self._addThread(target=fn)
170
    self._addThread(target=fn)
171
    self._addThread(target=fn)
172

    
173
    # Wait for threads to be waiting
174
    self.assertEqual(self.done.get(True, 1), "A")
175
    self.assertEqual(self.done.get(True, 1), "A")
176
    self.assertEqual(self.done.get(True, 1), "A")
177

    
178
    self.assertRaises(Queue.Empty, self.done.get_nowait)
179

    
180
    self.cond.acquire()
181
    self.assertEqual(self.cond._nwaiters, 3)
182
    # This new thread can"t acquire the lock, and thus call wait, before we
183
    # release it
184
    self._addThread(target=fn)
185
    self.cond.notifyAll()
186
    self.assertRaises(Queue.Empty, self.done.get_nowait)
187
    self.cond.release()
188

    
189
    # We should now get 3 W and 1 A (for the new thread) in whatever order
190
    w = 0
191
    a = 0
192
    for i in range(4):
193
      got = self.done.get(True, 1)
194
      if got == "W":
195
        w += 1
196
      elif got == "A":
197
        a += 1
198
      else:
199
        self.fail("Got %s on the done queue" % got)
200

    
201
    self.assertEqual(w, 3)
202
    self.assertEqual(a, 1)
203

    
204
    self.cond.acquire()
205
    self.cond.notifyAll()
206
    self.cond.release()
207
    self._waitThreads()
208
    self.assertEqual(self.done.get_nowait(), "W")
209
    self.assertRaises(Queue.Empty, self.done.get_nowait)
210

    
211
  def testBlockingWait(self):
212
    def _BlockingWait():
213
      self.cond.acquire()
214
      self.done.put("A")
215
      self.cond.wait()
216
      self.cond.release()
217
      self.done.put("W")
218

    
219
    self._TestWait(_BlockingWait)
220

    
221
  def testLongTimeoutWait(self):
222
    def _Helper():
223
      self.cond.acquire()
224
      self.done.put("A")
225
      self.cond.wait(15.0)
226
      self.cond.release()
227
      self.done.put("W")
228

    
229
    self._TestWait(_Helper)
230

    
231
  def _TimeoutWait(self, timeout, check):
232
    self.cond.acquire()
233
    self.cond.wait(timeout)
234
    self.cond.release()
235
    self.done.put(check)
236

    
237
  def testShortTimeoutWait(self):
238
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
239
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
240
    self._waitThreads()
241
    self.assertEqual(self.done.get_nowait(), "T1")
242
    self.assertEqual(self.done.get_nowait(), "T1")
243
    self.assertRaises(Queue.Empty, self.done.get_nowait)
244

    
245
  def testZeroTimeoutWait(self):
246
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
247
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
248
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
249
    self._waitThreads()
250
    self.assertEqual(self.done.get_nowait(), "T0")
251
    self.assertEqual(self.done.get_nowait(), "T0")
252
    self.assertEqual(self.done.get_nowait(), "T0")
253
    self.assertRaises(Queue.Empty, self.done.get_nowait)
254

    
255

    
256
class TestSharedLock(_ThreadedTestCase):
257
  """SharedLock tests"""
258

    
259
  def setUp(self):
260
    _ThreadedTestCase.setUp(self)
261
    self.sl = locking.SharedLock("TestSharedLock")
262

    
263
  def testSequenceAndOwnership(self):
264
    self.assertFalse(self.sl._is_owned())
265
    self.sl.acquire(shared=1)
266
    self.assert_(self.sl._is_owned())
267
    self.assert_(self.sl._is_owned(shared=1))
268
    self.assertFalse(self.sl._is_owned(shared=0))
269
    self.sl.release()
270
    self.assertFalse(self.sl._is_owned())
271
    self.sl.acquire()
272
    self.assert_(self.sl._is_owned())
273
    self.assertFalse(self.sl._is_owned(shared=1))
274
    self.assert_(self.sl._is_owned(shared=0))
275
    self.sl.release()
276
    self.assertFalse(self.sl._is_owned())
277
    self.sl.acquire(shared=1)
278
    self.assert_(self.sl._is_owned())
279
    self.assert_(self.sl._is_owned(shared=1))
280
    self.assertFalse(self.sl._is_owned(shared=0))
281
    self.sl.release()
282
    self.assertFalse(self.sl._is_owned())
283

    
284
  def testBooleanValue(self):
285
    # semaphores are supposed to return a true value on a successful acquire
286
    self.assert_(self.sl.acquire(shared=1))
287
    self.sl.release()
288
    self.assert_(self.sl.acquire())
289
    self.sl.release()
290

    
291
  def testDoubleLockingStoE(self):
292
    self.sl.acquire(shared=1)
293
    self.assertRaises(AssertionError, self.sl.acquire)
294

    
295
  def testDoubleLockingEtoS(self):
296
    self.sl.acquire()
297
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
298

    
299
  def testDoubleLockingStoS(self):
300
    self.sl.acquire(shared=1)
301
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
302

    
303
  def testDoubleLockingEtoE(self):
304
    self.sl.acquire()
305
    self.assertRaises(AssertionError, self.sl.acquire)
306

    
307
  # helper functions: called in a separate thread they acquire the lock, send
308
  # their identifier on the done queue, then release it.
309
  def _doItSharer(self):
310
    try:
311
      self.sl.acquire(shared=1)
312
      self.done.put('SHR')
313
      self.sl.release()
314
    except errors.LockError:
315
      self.done.put('ERR')
316

    
317
  def _doItExclusive(self):
318
    try:
319
      self.sl.acquire()
320
      self.done.put('EXC')
321
      self.sl.release()
322
    except errors.LockError:
323
      self.done.put('ERR')
324

    
325
  def _doItDelete(self):
326
    try:
327
      self.sl.delete()
328
      self.done.put('DEL')
329
    except errors.LockError:
330
      self.done.put('ERR')
331

    
332
  def testSharersCanCoexist(self):
333
    self.sl.acquire(shared=1)
334
    threading.Thread(target=self._doItSharer).start()
335
    self.assert_(self.done.get(True, 1))
336
    self.sl.release()
337

    
338
  @_Repeat
339
  def testExclusiveBlocksExclusive(self):
340
    self.sl.acquire()
341
    self._addThread(target=self._doItExclusive)
342
    self.assertRaises(Queue.Empty, self.done.get_nowait)
343
    self.sl.release()
344
    self._waitThreads()
345
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
346

    
347
  @_Repeat
348
  def testExclusiveBlocksDelete(self):
349
    self.sl.acquire()
350
    self._addThread(target=self._doItDelete)
351
    self.assertRaises(Queue.Empty, self.done.get_nowait)
352
    self.sl.release()
353
    self._waitThreads()
354
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
355
    self.sl = locking.SharedLock(self.sl.name)
356

    
357
  @_Repeat
358
  def testExclusiveBlocksSharer(self):
359
    self.sl.acquire()
360
    self._addThread(target=self._doItSharer)
361
    self.assertRaises(Queue.Empty, self.done.get_nowait)
362
    self.sl.release()
363
    self._waitThreads()
364
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
365

    
366
  @_Repeat
367
  def testSharerBlocksExclusive(self):
368
    self.sl.acquire(shared=1)
369
    self._addThread(target=self._doItExclusive)
370
    self.assertRaises(Queue.Empty, self.done.get_nowait)
371
    self.sl.release()
372
    self._waitThreads()
373
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
374

    
375
  @_Repeat
376
  def testSharerBlocksDelete(self):
377
    self.sl.acquire(shared=1)
378
    self._addThread(target=self._doItDelete)
379
    self.assertRaises(Queue.Empty, self.done.get_nowait)
380
    self.sl.release()
381
    self._waitThreads()
382
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
383
    self.sl = locking.SharedLock(self.sl.name)
384

    
385
  @_Repeat
386
  def testWaitingExclusiveBlocksSharer(self):
387
    """SKIPPED testWaitingExclusiveBlockSharer"""
388
    return
389

    
390
    self.sl.acquire(shared=1)
391
    # the lock is acquired in shared mode...
392
    self._addThread(target=self._doItExclusive)
393
    # ...but now an exclusive is waiting...
394
    self._addThread(target=self._doItSharer)
395
    # ...so the sharer should be blocked as well
396
    self.assertRaises(Queue.Empty, self.done.get_nowait)
397
    self.sl.release()
398
    self._waitThreads()
399
    # The exclusive passed before
400
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
401
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
402

    
403
  @_Repeat
404
  def testWaitingSharerBlocksExclusive(self):
405
    """SKIPPED testWaitingSharerBlocksExclusive"""
406
    return
407

    
408
    self.sl.acquire()
409
    # the lock is acquired in exclusive mode...
410
    self._addThread(target=self._doItSharer)
411
    # ...but now a sharer is waiting...
412
    self._addThread(target=self._doItExclusive)
413
    # ...the exclusive is waiting too...
414
    self.assertRaises(Queue.Empty, self.done.get_nowait)
415
    self.sl.release()
416
    self._waitThreads()
417
    # The sharer passed before
418
    self.assertEqual(self.done.get_nowait(), 'SHR')
419
    self.assertEqual(self.done.get_nowait(), 'EXC')
420

    
421
  def testDelete(self):
422
    self.sl.delete()
423
    self.assertRaises(errors.LockError, self.sl.acquire)
424
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
425
    self.assertRaises(errors.LockError, self.sl.delete)
426

    
427
  def testDeleteTimeout(self):
428
    self.sl.delete(timeout=60)
429

    
430
  def testNoDeleteIfSharer(self):
431
    self.sl.acquire(shared=1)
432
    self.assertRaises(AssertionError, self.sl.delete)
433

    
434
  @_Repeat
435
  def testDeletePendingSharersExclusiveDelete(self):
436
    self.sl.acquire()
437
    self._addThread(target=self._doItSharer)
438
    self._addThread(target=self._doItSharer)
439
    self._addThread(target=self._doItExclusive)
440
    self._addThread(target=self._doItDelete)
441
    self.sl.delete()
442
    self._waitThreads()
443
    # The threads who were pending return ERR
444
    for _ in range(4):
445
      self.assertEqual(self.done.get_nowait(), 'ERR')
446
    self.sl = locking.SharedLock(self.sl.name)
447

    
448
  @_Repeat
449
  def testDeletePendingDeleteExclusiveSharers(self):
450
    self.sl.acquire()
451
    self._addThread(target=self._doItDelete)
452
    self._addThread(target=self._doItExclusive)
453
    self._addThread(target=self._doItSharer)
454
    self._addThread(target=self._doItSharer)
455
    self.sl.delete()
456
    self._waitThreads()
457
    # The two threads who were pending return both ERR
458
    self.assertEqual(self.done.get_nowait(), 'ERR')
459
    self.assertEqual(self.done.get_nowait(), 'ERR')
460
    self.assertEqual(self.done.get_nowait(), 'ERR')
461
    self.assertEqual(self.done.get_nowait(), 'ERR')
462
    self.sl = locking.SharedLock(self.sl.name)
463

    
464
  @_Repeat
465
  def testExclusiveAcquireTimeout(self):
466
    for shared in [0, 1]:
467
      on_queue = threading.Event()
468
      release_exclusive = threading.Event()
469

    
470
      def _LockExclusive():
471
        self.sl.acquire(shared=0, test_notify=on_queue.set)
472
        self.done.put("A: start wait")
473
        release_exclusive.wait()
474
        self.done.put("A: end wait")
475
        self.sl.release()
476

    
477
      # Start thread to hold lock in exclusive mode
478
      self._addThread(target=_LockExclusive)
479

    
480
      # Wait for wait to begin
481
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
482

    
483
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
484
      # on the queue
485
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
486
                                      test_notify=release_exclusive.set))
487

    
488
      self.done.put("got 2nd")
489
      self.sl.release()
490

    
491
      self._waitThreads()
492

    
493
      self.assertEqual(self.done.get_nowait(), "A: end wait")
494
      self.assertEqual(self.done.get_nowait(), "got 2nd")
495
      self.assertRaises(Queue.Empty, self.done.get_nowait)
496

    
497
  @_Repeat
498
  def testAcquireExpiringTimeout(self):
499
    def _AcquireWithTimeout(shared, timeout):
500
      if not self.sl.acquire(shared=shared, timeout=timeout):
501
        self.done.put("timeout")
502

    
503
    for shared in [0, 1]:
504
      # Lock exclusively
505
      self.sl.acquire()
506

    
507
      # Start shared acquires with timeout between 0 and 20 ms
508
      for i in range(11):
509
        self._addThread(target=_AcquireWithTimeout,
510
                        args=(shared, i * 2.0 / 1000.0))
511

    
512
      # Wait for threads to finish (makes sure the acquire timeout expires
513
      # before releasing the lock)
514
      self._waitThreads()
515

    
516
      # Release lock
517
      self.sl.release()
518

    
519
      for _ in range(11):
520
        self.assertEqual(self.done.get_nowait(), "timeout")
521

    
522
      self.assertRaises(Queue.Empty, self.done.get_nowait)
523

    
524
  @_Repeat
525
  def testSharedSkipExclusiveAcquires(self):
526
    # Tests whether shared acquires jump in front of exclusive acquires in the
527
    # queue.
528

    
529
    def _Acquire(shared, name, notify_ev, wait_ev):
530
      if notify_ev:
531
        notify_fn = notify_ev.set
532
      else:
533
        notify_fn = None
534

    
535
      if wait_ev:
536
        wait_ev.wait()
537

    
538
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
539
        return
540

    
541
      self.done.put(name)
542
      self.sl.release()
543

    
544
    # Get exclusive lock while we fill the queue
545
    self.sl.acquire()
546

    
547
    shrcnt1 = 5
548
    shrcnt2 = 7
549
    shrcnt3 = 9
550
    shrcnt4 = 2
551

    
552
    # Add acquires using threading.Event for synchronization. They'll be
553
    # acquired exactly in the order defined in this list.
554
    acquires = (shrcnt1 * [(1, "shared 1")] +
555
                3 * [(0, "exclusive 1")] +
556
                shrcnt2 * [(1, "shared 2")] +
557
                shrcnt3 * [(1, "shared 3")] +
558
                shrcnt4 * [(1, "shared 4")] +
559
                3 * [(0, "exclusive 2")])
560

    
561
    ev_cur = None
562
    ev_prev = None
563

    
564
    for args in acquires:
565
      ev_cur = threading.Event()
566
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
567
      ev_prev = ev_cur
568

    
569
    # Wait for last acquire to start
570
    ev_prev.wait()
571

    
572
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
573
    # together
574
    self.assertEqual(self.sl._count_pending(), 7)
575

    
576
    # Release exclusive lock and wait
577
    self.sl.release()
578

    
579
    self._waitThreads()
580

    
581
    # Check sequence
582
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
583
      # Shared locks aren't guaranteed to be notified in order, but they'll be
584
      # first
585
      tmp = self.done.get_nowait()
586
      if tmp == "shared 1":
587
        shrcnt1 -= 1
588
      elif tmp == "shared 2":
589
        shrcnt2 -= 1
590
      elif tmp == "shared 3":
591
        shrcnt3 -= 1
592
      elif tmp == "shared 4":
593
        shrcnt4 -= 1
594
    self.assertEqual(shrcnt1, 0)
595
    self.assertEqual(shrcnt2, 0)
596
    self.assertEqual(shrcnt3, 0)
597
    self.assertEqual(shrcnt3, 0)
598

    
599
    for _ in range(3):
600
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
601

    
602
    for _ in range(3):
603
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
604

    
605
    self.assertRaises(Queue.Empty, self.done.get_nowait)
606

    
607
  @_Repeat
608
  def testMixedAcquireTimeout(self):
609
    sync = threading.Event()
610

    
611
    def _AcquireShared(ev):
612
      if not self.sl.acquire(shared=1, timeout=None):
613
        return
614

    
615
      self.done.put("shared")
616

    
617
      # Notify main thread
618
      ev.set()
619

    
620
      # Wait for notification from main thread
621
      sync.wait()
622

    
623
      # Release lock
624
      self.sl.release()
625

    
626
    acquires = []
627
    for _ in range(3):
628
      ev = threading.Event()
629
      self._addThread(target=_AcquireShared, args=(ev, ))
630
      acquires.append(ev)
631

    
632
    # Wait for all acquires to finish
633
    for i in acquires:
634
      i.wait()
635

    
636
    self.assertEqual(self.sl._count_pending(), 0)
637

    
638
    # Try to get exclusive lock
639
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
640

    
641
    # Acquire exclusive without timeout
642
    exclsync = threading.Event()
643
    exclev = threading.Event()
644

    
645
    def _AcquireExclusive():
646
      if not self.sl.acquire(shared=0):
647
        return
648

    
649
      self.done.put("exclusive")
650

    
651
      # Notify main thread
652
      exclev.set()
653

    
654
      # Wait for notification from main thread
655
      exclsync.wait()
656

    
657
      self.sl.release()
658

    
659
    self._addThread(target=_AcquireExclusive)
660

    
661
    # Try to get exclusive lock
662
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
663

    
664
    # Make all shared holders release their locks
665
    sync.set()
666

    
667
    # Wait for exclusive acquire to succeed
668
    exclev.wait()
669

    
670
    self.assertEqual(self.sl._count_pending(), 0)
671

    
672
    # Try to get exclusive lock
673
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
674

    
675
    def _AcquireSharedSimple():
676
      if self.sl.acquire(shared=1, timeout=None):
677
        self.done.put("shared2")
678
        self.sl.release()
679

    
680
    for _ in range(10):
681
      self._addThread(target=_AcquireSharedSimple)
682

    
683
    # Tell exclusive lock to release
684
    exclsync.set()
685

    
686
    # Wait for everything to finish
687
    self._waitThreads()
688

    
689
    self.assertEqual(self.sl._count_pending(), 0)
690

    
691
    # Check sequence
692
    for _ in range(3):
693
      self.assertEqual(self.done.get_nowait(), "shared")
694

    
695
    self.assertEqual(self.done.get_nowait(), "exclusive")
696

    
697
    for _ in range(10):
698
      self.assertEqual(self.done.get_nowait(), "shared2")
699

    
700
    self.assertRaises(Queue.Empty, self.done.get_nowait)
701

    
702

    
703
class TestSharedLockInCondition(_ThreadedTestCase):
704
  """SharedLock as a condition lock tests"""
705

    
706
  def setUp(self):
707
    _ThreadedTestCase.setUp(self)
708
    self.sl = locking.SharedLock("TestSharedLockInCondition")
709
    self.setCondition()
710

    
711
  def setCondition(self):
712
    self.cond = threading.Condition(self.sl)
713

    
714
  def testKeepMode(self):
715
    self.cond.acquire(shared=1)
716
    self.assert_(self.sl._is_owned(shared=1))
717
    self.cond.wait(0)
718
    self.assert_(self.sl._is_owned(shared=1))
719
    self.cond.release()
720
    self.cond.acquire(shared=0)
721
    self.assert_(self.sl._is_owned(shared=0))
722
    self.cond.wait(0)
723
    self.assert_(self.sl._is_owned(shared=0))
724
    self.cond.release()
725

    
726

    
727
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
728
  """SharedLock as a pipe condition lock tests"""
729

    
730
  def setCondition(self):
731
    self.cond = locking.PipeCondition(self.sl)
732

    
733

    
734
class TestSSynchronizedDecorator(_ThreadedTestCase):
735
  """Shared Lock Synchronized decorator test"""
736

    
737
  def setUp(self):
738
    _ThreadedTestCase.setUp(self)
739

    
740
  @locking.ssynchronized(_decoratorlock)
741
  def _doItExclusive(self):
742
    self.assert_(_decoratorlock._is_owned())
743
    self.done.put('EXC')
744

    
745
  @locking.ssynchronized(_decoratorlock, shared=1)
746
  def _doItSharer(self):
747
    self.assert_(_decoratorlock._is_owned(shared=1))
748
    self.done.put('SHR')
749

    
750
  def testDecoratedFunctions(self):
751
    self._doItExclusive()
752
    self.assertFalse(_decoratorlock._is_owned())
753
    self._doItSharer()
754
    self.assertFalse(_decoratorlock._is_owned())
755

    
756
  def testSharersCanCoexist(self):
757
    _decoratorlock.acquire(shared=1)
758
    threading.Thread(target=self._doItSharer).start()
759
    self.assert_(self.done.get(True, 1))
760
    _decoratorlock.release()
761

    
762
  @_Repeat
763
  def testExclusiveBlocksExclusive(self):
764
    _decoratorlock.acquire()
765
    self._addThread(target=self._doItExclusive)
766
    # give it a bit of time to check that it's not actually doing anything
767
    self.assertRaises(Queue.Empty, self.done.get_nowait)
768
    _decoratorlock.release()
769
    self._waitThreads()
770
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
771

    
772
  @_Repeat
773
  def testExclusiveBlocksSharer(self):
774
    _decoratorlock.acquire()
775
    self._addThread(target=self._doItSharer)
776
    self.assertRaises(Queue.Empty, self.done.get_nowait)
777
    _decoratorlock.release()
778
    self._waitThreads()
779
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
780

    
781
  @_Repeat
782
  def testSharerBlocksExclusive(self):
783
    _decoratorlock.acquire(shared=1)
784
    self._addThread(target=self._doItExclusive)
785
    self.assertRaises(Queue.Empty, self.done.get_nowait)
786
    _decoratorlock.release()
787
    self._waitThreads()
788
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
789

    
790

    
791
class TestLockSet(_ThreadedTestCase):
792
  """LockSet tests"""
793

    
794
  def setUp(self):
795
    _ThreadedTestCase.setUp(self)
796
    self._setUpLS()
797

    
798
  def _setUpLS(self):
799
    """Helper to (re)initialize the lock set"""
800
    self.resources = ['one', 'two', 'three']
801
    self.ls = locking.LockSet(self.resources, "TestLockSet")
802

    
803
  def testResources(self):
804
    self.assertEquals(self.ls._names(), set(self.resources))
805
    newls = locking.LockSet([], "TestLockSet.testResources")
806
    self.assertEquals(newls._names(), set())
807

    
808
  def testAcquireRelease(self):
809
    self.assert_(self.ls.acquire('one'))
810
    self.assertEquals(self.ls._list_owned(), set(['one']))
811
    self.ls.release()
812
    self.assertEquals(self.ls._list_owned(), set())
813
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
814
    self.assertEquals(self.ls._list_owned(), set(['one']))
815
    self.ls.release()
816
    self.assertEquals(self.ls._list_owned(), set())
817
    self.ls.acquire(['one', 'two', 'three'])
818
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
819
    self.ls.release('one')
820
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
821
    self.ls.release(['three'])
822
    self.assertEquals(self.ls._list_owned(), set(['two']))
823
    self.ls.release()
824
    self.assertEquals(self.ls._list_owned(), set())
825
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
826
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
827
    self.ls.release()
828
    self.assertEquals(self.ls._list_owned(), set())
829

    
830
  def testNoDoubleAcquire(self):
831
    self.ls.acquire('one')
832
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
833
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
834
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
835
    self.ls.release()
836
    self.ls.acquire(['one', 'three'])
837
    self.ls.release('one')
838
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
839
    self.ls.release('three')
840

    
841
  def testNoWrongRelease(self):
842
    self.assertRaises(AssertionError, self.ls.release)
843
    self.ls.acquire('one')
844
    self.assertRaises(AssertionError, self.ls.release, 'two')
845

    
846
  def testAddRemove(self):
847
    self.ls.add('four')
848
    self.assertEquals(self.ls._list_owned(), set())
849
    self.assert_('four' in self.ls._names())
850
    self.ls.add(['five', 'six', 'seven'], acquired=1)
851
    self.assert_('five' in self.ls._names())
852
    self.assert_('six' in self.ls._names())
853
    self.assert_('seven' in self.ls._names())
854
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
855
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
856
    self.assert_('five' not in self.ls._names())
857
    self.assert_('six' not in self.ls._names())
858
    self.assertEquals(self.ls._list_owned(), set(['seven']))
859
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
860
    self.ls.remove('seven')
861
    self.assert_('seven' not in self.ls._names())
862
    self.assertEquals(self.ls._list_owned(), set([]))
863
    self.ls.acquire(None, shared=1)
864
    self.assertRaises(AssertionError, self.ls.add, 'eight')
865
    self.ls.release()
866
    self.ls.acquire(None)
867
    self.ls.add('eight', acquired=1)
868
    self.assert_('eight' in self.ls._names())
869
    self.assert_('eight' in self.ls._list_owned())
870
    self.ls.add('nine')
871
    self.assert_('nine' in self.ls._names())
872
    self.assert_('nine' not in self.ls._list_owned())
873
    self.ls.release()
874
    self.ls.remove(['two'])
875
    self.assert_('two' not in self.ls._names())
876
    self.ls.acquire('three')
877
    self.assertEquals(self.ls.remove(['three']), ['three'])
878
    self.assert_('three' not in self.ls._names())
879
    self.assertEquals(self.ls.remove('three'), [])
880
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
881
    self.assert_('one' not in self.ls._names())
882

    
883
  def testRemoveNonBlocking(self):
884
    self.ls.acquire('one')
885
    self.assertEquals(self.ls.remove('one'), ['one'])
886
    self.ls.acquire(['two', 'three'])
887
    self.assertEquals(self.ls.remove(['two', 'three']),
888
                      ['two', 'three'])
889

    
890
  def testNoDoubleAdd(self):
891
    self.assertRaises(errors.LockError, self.ls.add, 'two')
892
    self.ls.add('four')
893
    self.assertRaises(errors.LockError, self.ls.add, 'four')
894

    
895
  def testNoWrongRemoves(self):
896
    self.ls.acquire(['one', 'three'], shared=1)
897
    # Cannot remove 'two' while holding something which is not a superset
898
    self.assertRaises(AssertionError, self.ls.remove, 'two')
899
    # Cannot remove 'three' as we are sharing it
900
    self.assertRaises(AssertionError, self.ls.remove, 'three')
901

    
902
  def testAcquireSetLock(self):
903
    # acquire the set-lock exclusively
904
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
905
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
906
    self.assertEquals(self.ls._is_owned(), True)
907
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
908
    # I can still add/remove elements...
909
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
910
    self.assert_(self.ls.add('six'))
911
    self.ls.release()
912
    # share the set-lock
913
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
914
    # adding new elements is not possible
915
    self.assertRaises(AssertionError, self.ls.add, 'five')
916
    self.ls.release()
917

    
918
  def testAcquireWithRepetitions(self):
919
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
920
                      set(['two', 'two', 'three']))
921
    self.ls.release(['two', 'two'])
922
    self.assertEquals(self.ls._list_owned(), set(['three']))
923

    
924
  def testEmptyAcquire(self):
925
    # Acquire an empty list of locks...
926
    self.assertEquals(self.ls.acquire([]), set())
927
    self.assertEquals(self.ls._list_owned(), set())
928
    # New locks can still be addded
929
    self.assert_(self.ls.add('six'))
930
    # "re-acquiring" is not an issue, since we had really acquired nothing
931
    self.assertEquals(self.ls.acquire([], shared=1), set())
932
    self.assertEquals(self.ls._list_owned(), set())
933
    # We haven't really acquired anything, so we cannot release
934
    self.assertRaises(AssertionError, self.ls.release)
935

    
936
  def _doLockSet(self, names, shared):
937
    try:
938
      self.ls.acquire(names, shared=shared)
939
      self.done.put('DONE')
940
      self.ls.release()
941
    except errors.LockError:
942
      self.done.put('ERR')
943

    
944
  def _doAddSet(self, names):
945
    try:
946
      self.ls.add(names, acquired=1)
947
      self.done.put('DONE')
948
      self.ls.release()
949
    except errors.LockError:
950
      self.done.put('ERR')
951

    
952
  def _doRemoveSet(self, names):
953
    self.done.put(self.ls.remove(names))
954

    
955
  @_Repeat
956
  def testConcurrentSharedAcquire(self):
957
    self.ls.acquire(['one', 'two'], shared=1)
958
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
959
    self._waitThreads()
960
    self.assertEqual(self.done.get_nowait(), 'DONE')
961
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
962
    self._waitThreads()
963
    self.assertEqual(self.done.get_nowait(), 'DONE')
964
    self._addThread(target=self._doLockSet, args=('three', 1))
965
    self._waitThreads()
966
    self.assertEqual(self.done.get_nowait(), 'DONE')
967
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
968
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
969
    self.assertRaises(Queue.Empty, self.done.get_nowait)
970
    self.ls.release()
971
    self._waitThreads()
972
    self.assertEqual(self.done.get_nowait(), 'DONE')
973
    self.assertEqual(self.done.get_nowait(), 'DONE')
974

    
975
  @_Repeat
976
  def testConcurrentExclusiveAcquire(self):
977
    self.ls.acquire(['one', 'two'])
978
    self._addThread(target=self._doLockSet, args=('three', 1))
979
    self._waitThreads()
980
    self.assertEqual(self.done.get_nowait(), 'DONE')
981
    self._addThread(target=self._doLockSet, args=('three', 0))
982
    self._waitThreads()
983
    self.assertEqual(self.done.get_nowait(), 'DONE')
984
    self.assertRaises(Queue.Empty, self.done.get_nowait)
985
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
986
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
987
    self._addThread(target=self._doLockSet, args=('one', 0))
988
    self._addThread(target=self._doLockSet, args=('one', 1))
989
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
990
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
991
    self.assertRaises(Queue.Empty, self.done.get_nowait)
992
    self.ls.release()
993
    self._waitThreads()
994
    for _ in range(6):
995
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
996

    
997
  @_Repeat
998
  def testSimpleAcquireTimeoutExpiring(self):
999
    names = sorted(self.ls._names())
1000
    self.assert_(len(names) >= 3)
1001

    
1002
    # Get name of first lock
1003
    first = names[0]
1004

    
1005
    # Get name of last lock
1006
    last = names.pop()
1007

    
1008
    checks = [
1009
      # Block first and try to lock it again
1010
      (first, first),
1011

    
1012
      # Block last and try to lock all locks
1013
      (None, first),
1014

    
1015
      # Block last and try to lock it again
1016
      (last, last),
1017
      ]
1018

    
1019
    for (wanted, block) in checks:
1020
      # Lock in exclusive mode
1021
      self.assert_(self.ls.acquire(block, shared=0))
1022

    
1023
      def _AcquireOne():
1024
        # Try to get the same lock again with a timeout (should never succeed)
1025
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1026
        if acquired:
1027
          self.done.put("acquired")
1028
          self.ls.release()
1029
        else:
1030
          self.assert_(acquired is None)
1031
          self.assertFalse(self.ls._list_owned())
1032
          self.assertFalse(self.ls._is_owned())
1033
          self.done.put("not acquired")
1034

    
1035
      self._addThread(target=_AcquireOne)
1036

    
1037
      # Wait for timeout in thread to expire
1038
      self._waitThreads()
1039

    
1040
      # Release exclusive lock again
1041
      self.ls.release()
1042

    
1043
      self.assertEqual(self.done.get_nowait(), "not acquired")
1044
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1045

    
1046
  @_Repeat
1047
  def testDelayedAndExpiringLockAcquire(self):
1048
    self._setUpLS()
1049
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1050

    
1051
    for expire in (False, True):
1052
      names = sorted(self.ls._names())
1053
      self.assertEqual(len(names), 8)
1054

    
1055
      lock_ev = dict([(i, threading.Event()) for i in names])
1056

    
1057
      # Lock all in exclusive mode
1058
      self.assert_(self.ls.acquire(names, shared=0))
1059

    
1060
      if expire:
1061
        # We'll wait at least 300ms per lock
1062
        lockwait = len(names) * [0.3]
1063

    
1064
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1065
        # this gives us up to 2.4s to fail.
1066
        lockall_timeout = 0.4
1067
      else:
1068
        # This should finish rather quickly
1069
        lockwait = None
1070
        lockall_timeout = len(names) * 5.0
1071

    
1072
      def _LockAll():
1073
        def acquire_notification(name):
1074
          if not expire:
1075
            self.done.put("getting %s" % name)
1076

    
1077
          # Kick next lock
1078
          lock_ev[name].set()
1079

    
1080
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1081
                           test_notify=acquire_notification):
1082
          self.done.put("got all")
1083
          self.ls.release()
1084
        else:
1085
          self.done.put("timeout on all")
1086

    
1087
        # Notify all locks
1088
        for ev in lock_ev.values():
1089
          ev.set()
1090

    
1091
      t = self._addThread(target=_LockAll)
1092

    
1093
      for idx, name in enumerate(names):
1094
        # Wait for actual acquire on this lock to start
1095
        lock_ev[name].wait(10.0)
1096

    
1097
        if expire and t.isAlive():
1098
          # Wait some time after getting the notification to make sure the lock
1099
          # acquire will expire
1100
          SafeSleep(lockwait[idx])
1101

    
1102
        self.ls.release(names=name)
1103

    
1104
      self.assertFalse(self.ls._list_owned())
1105

    
1106
      self._waitThreads()
1107

    
1108
      if expire:
1109
        # Not checking which locks were actually acquired. Doing so would be
1110
        # too timing-dependant.
1111
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1112
      else:
1113
        for i in names:
1114
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1115
        self.assertEqual(self.done.get_nowait(), "got all")
1116
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1117

    
1118
  @_Repeat
1119
  def testConcurrentRemove(self):
1120
    self.ls.add('four')
1121
    self.ls.acquire(['one', 'two', 'four'])
1122
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1123
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1124
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1125
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1126
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1127
    self.ls.remove('one')
1128
    self.ls.release()
1129
    self._waitThreads()
1130
    for i in range(4):
1131
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1132
    self.ls.add(['five', 'six'], acquired=1)
1133
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1134
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1135
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1136
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1137
    self.ls.remove('five')
1138
    self.ls.release()
1139
    self._waitThreads()
1140
    for i in range(4):
1141
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1142
    self.ls.acquire(['three', 'four'])
1143
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1144
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1145
    self.ls.remove('four')
1146
    self._waitThreads()
1147
    self.assertEqual(self.done.get_nowait(), ['six'])
1148
    self._addThread(target=self._doRemoveSet, args=(['two']))
1149
    self._waitThreads()
1150
    self.assertEqual(self.done.get_nowait(), ['two'])
1151
    self.ls.release()
1152
    # reset lockset
1153
    self._setUpLS()
1154

    
1155
  @_Repeat
1156
  def testConcurrentSharedSetLock(self):
1157
    # share the set-lock...
1158
    self.ls.acquire(None, shared=1)
1159
    # ...another thread can share it too
1160
    self._addThread(target=self._doLockSet, args=(None, 1))
1161
    self._waitThreads()
1162
    self.assertEqual(self.done.get_nowait(), 'DONE')
1163
    # ...or just share some elements
1164
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1165
    self._waitThreads()
1166
    self.assertEqual(self.done.get_nowait(), 'DONE')
1167
    # ...but not add new ones or remove any
1168
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1169
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1170
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1171
    # this just releases the set-lock
1172
    self.ls.release([])
1173
    t.join(60)
1174
    self.assertEqual(self.done.get_nowait(), 'DONE')
1175
    # release the lock on the actual elements so remove() can proceed too
1176
    self.ls.release()
1177
    self._waitThreads()
1178
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1179
    # reset lockset
1180
    self._setUpLS()
1181

    
1182
  @_Repeat
1183
  def testConcurrentExclusiveSetLock(self):
1184
    # acquire the set-lock...
1185
    self.ls.acquire(None, shared=0)
1186
    # ...no one can do anything else
1187
    self._addThread(target=self._doLockSet, args=(None, 1))
1188
    self._addThread(target=self._doLockSet, args=(None, 0))
1189
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1190
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1191
    self._addThread(target=self._doAddSet, args=(['nine']))
1192
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1193
    self.ls.release()
1194
    self._waitThreads()
1195
    for _ in range(5):
1196
      self.assertEqual(self.done.get(True, 1), 'DONE')
1197
    # cleanup
1198
    self._setUpLS()
1199

    
1200
  @_Repeat
1201
  def testConcurrentSetLockAdd(self):
1202
    self.ls.acquire('one')
1203
    # Another thread wants the whole SetLock
1204
    self._addThread(target=self._doLockSet, args=(None, 0))
1205
    self._addThread(target=self._doLockSet, args=(None, 1))
1206
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1207
    self.assertRaises(AssertionError, self.ls.add, 'four')
1208
    self.ls.release()
1209
    self._waitThreads()
1210
    self.assertEqual(self.done.get_nowait(), 'DONE')
1211
    self.assertEqual(self.done.get_nowait(), 'DONE')
1212
    self.ls.acquire(None)
1213
    self._addThread(target=self._doLockSet, args=(None, 0))
1214
    self._addThread(target=self._doLockSet, args=(None, 1))
1215
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1216
    self.ls.add('four')
1217
    self.ls.add('five', acquired=1)
1218
    self.ls.add('six', acquired=1, shared=1)
1219
    self.assertEquals(self.ls._list_owned(),
1220
      set(['one', 'two', 'three', 'five', 'six']))
1221
    self.assertEquals(self.ls._is_owned(), True)
1222
    self.assertEquals(self.ls._names(),
1223
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1224
    self.ls.release()
1225
    self._waitThreads()
1226
    self.assertEqual(self.done.get_nowait(), 'DONE')
1227
    self.assertEqual(self.done.get_nowait(), 'DONE')
1228
    self._setUpLS()
1229

    
1230
  @_Repeat
1231
  def testEmptyLockSet(self):
1232
    # get the set-lock
1233
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1234
    # now empty it...
1235
    self.ls.remove(['one', 'two', 'three'])
1236
    # and adds/locks by another thread still wait
1237
    self._addThread(target=self._doAddSet, args=(['nine']))
1238
    self._addThread(target=self._doLockSet, args=(None, 1))
1239
    self._addThread(target=self._doLockSet, args=(None, 0))
1240
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1241
    self.ls.release()
1242
    self._waitThreads()
1243
    for _ in range(3):
1244
      self.assertEqual(self.done.get_nowait(), 'DONE')
1245
    # empty it again...
1246
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1247
    # now share it...
1248
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1249
    # other sharers can go, adds still wait
1250
    self._addThread(target=self._doLockSet, args=(None, 1))
1251
    self._waitThreads()
1252
    self.assertEqual(self.done.get_nowait(), 'DONE')
1253
    self._addThread(target=self._doAddSet, args=(['nine']))
1254
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1255
    self.ls.release()
1256
    self._waitThreads()
1257
    self.assertEqual(self.done.get_nowait(), 'DONE')
1258
    self._setUpLS()
1259

    
1260

    
1261
class TestGanetiLockManager(_ThreadedTestCase):
1262

    
1263
  def setUp(self):
1264
    _ThreadedTestCase.setUp(self)
1265
    self.nodes=['n1', 'n2']
1266
    self.instances=['i1', 'i2', 'i3']
1267
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1268
                                        instances=self.instances)
1269

    
1270
  def tearDown(self):
1271
    # Don't try this at home...
1272
    locking.GanetiLockManager._instance = None
1273

    
1274
  def testLockingConstants(self):
1275
    # The locking library internally cheats by assuming its constants have some
1276
    # relationships with each other. Check those hold true.
1277
    # This relationship is also used in the Processor to recursively acquire
1278
    # the right locks. Again, please don't break it.
1279
    for i in range(len(locking.LEVELS)):
1280
      self.assertEqual(i, locking.LEVELS[i])
1281

    
1282
  def testDoubleGLFails(self):
1283
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1284

    
1285
  def testLockNames(self):
1286
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1287
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1288
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1289
                     set(self.instances))
1290

    
1291
  def testInitAndResources(self):
1292
    locking.GanetiLockManager._instance = None
1293
    self.GL = locking.GanetiLockManager([], [])
1294
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1295
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1296
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1297

    
1298
    locking.GanetiLockManager._instance = None
1299
    self.GL = locking.GanetiLockManager(self.nodes, [])
1300
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1301
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1302
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1303

    
1304
    locking.GanetiLockManager._instance = None
1305
    self.GL = locking.GanetiLockManager([], self.instances)
1306
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1307
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1308
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1309
                     set(self.instances))
1310

    
1311
  def testAcquireRelease(self):
1312
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1313
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1314
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1315
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1316
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1317
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1318
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1319
    self.GL.release(locking.LEVEL_NODE)
1320
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1321
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1322
    self.GL.release(locking.LEVEL_INSTANCE)
1323
    self.assertRaises(errors.LockError, self.GL.acquire,
1324
                      locking.LEVEL_INSTANCE, ['i5'])
1325
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1326
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1327

    
1328
  def testAcquireWholeSets(self):
1329
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1330
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1331
                      set(self.instances))
1332
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1333
                      set(self.instances))
1334
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1335
                      set(self.nodes))
1336
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1337
                      set(self.nodes))
1338
    self.GL.release(locking.LEVEL_NODE)
1339
    self.GL.release(locking.LEVEL_INSTANCE)
1340
    self.GL.release(locking.LEVEL_CLUSTER)
1341

    
1342
  def testAcquireWholeAndPartial(self):
1343
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1344
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1345
                      set(self.instances))
1346
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1347
                      set(self.instances))
1348
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1349
                      set(['n2']))
1350
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1351
                      set(['n2']))
1352
    self.GL.release(locking.LEVEL_NODE)
1353
    self.GL.release(locking.LEVEL_INSTANCE)
1354
    self.GL.release(locking.LEVEL_CLUSTER)
1355

    
1356
  def testBGLDependency(self):
1357
    self.assertRaises(AssertionError, self.GL.acquire,
1358
                      locking.LEVEL_NODE, ['n1', 'n2'])
1359
    self.assertRaises(AssertionError, self.GL.acquire,
1360
                      locking.LEVEL_INSTANCE, ['i3'])
1361
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1362
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1363
    self.assertRaises(AssertionError, self.GL.release,
1364
                      locking.LEVEL_CLUSTER, ['BGL'])
1365
    self.assertRaises(AssertionError, self.GL.release,
1366
                      locking.LEVEL_CLUSTER)
1367
    self.GL.release(locking.LEVEL_NODE)
1368
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1369
    self.assertRaises(AssertionError, self.GL.release,
1370
                      locking.LEVEL_CLUSTER, ['BGL'])
1371
    self.assertRaises(AssertionError, self.GL.release,
1372
                      locking.LEVEL_CLUSTER)
1373
    self.GL.release(locking.LEVEL_INSTANCE)
1374

    
1375
  def testWrongOrder(self):
1376
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1377
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1378
    self.assertRaises(AssertionError, self.GL.acquire,
1379
                      locking.LEVEL_NODE, ['n1'])
1380
    self.assertRaises(AssertionError, self.GL.acquire,
1381
                      locking.LEVEL_INSTANCE, ['i2'])
1382

    
1383
  # Helper function to run as a thread that shared the BGL and then acquires
1384
  # some locks at another level.
1385
  def _doLock(self, level, names, shared):
1386
    try:
1387
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1388
      self.GL.acquire(level, names, shared=shared)
1389
      self.done.put('DONE')
1390
      self.GL.release(level)
1391
      self.GL.release(locking.LEVEL_CLUSTER)
1392
    except errors.LockError:
1393
      self.done.put('ERR')
1394

    
1395
  @_Repeat
1396
  def testConcurrency(self):
1397
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1398
    self._addThread(target=self._doLock,
1399
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1400
    self._waitThreads()
1401
    self.assertEqual(self.done.get_nowait(), 'DONE')
1402
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1403
    self._addThread(target=self._doLock,
1404
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1405
    self._waitThreads()
1406
    self.assertEqual(self.done.get_nowait(), 'DONE')
1407
    self._addThread(target=self._doLock,
1408
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1409
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1410
    self.GL.release(locking.LEVEL_INSTANCE)
1411
    self._waitThreads()
1412
    self.assertEqual(self.done.get_nowait(), 'DONE')
1413
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1414
    self._addThread(target=self._doLock,
1415
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1416
    self._waitThreads()
1417
    self.assertEqual(self.done.get_nowait(), 'DONE')
1418
    self._addThread(target=self._doLock,
1419
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1420
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1421
    self.GL.release(locking.LEVEL_INSTANCE)
1422
    self._waitThreads()
1423
    self.assertEqual(self.done.get(True, 1), 'DONE')
1424
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1425

    
1426

    
1427
class TestLockMonitor(_ThreadedTestCase):
1428
  def setUp(self):
1429
    _ThreadedTestCase.setUp(self)
1430
    self.lm = locking.LockMonitor()
1431

    
1432
  def testSingleThread(self):
1433
    locks = []
1434

    
1435
    for i in range(100):
1436
      name = "TestLock%s" % i
1437
      locks.append(locking.SharedLock(name, monitor=self.lm))
1438

    
1439
    self.assertEqual(len(self.lm._locks), len(locks))
1440

    
1441
    # Delete all locks
1442
    del locks[:]
1443

    
1444
    # The garbage collector might needs some time
1445
    def _CheckLocks():
1446
      if self.lm._locks:
1447
        raise utils.RetryAgain()
1448

    
1449
    utils.Retry(_CheckLocks, 0.1, 30.0)
1450

    
1451
    self.assertFalse(self.lm._locks)
1452

    
1453
  def testMultiThread(self):
1454
    locks = []
1455

    
1456
    def _CreateLock(prev, next, name):
1457
      prev.wait()
1458
      locks.append(locking.SharedLock(name, monitor=self.lm))
1459
      if next:
1460
        next.set()
1461

    
1462
    expnames = []
1463

    
1464
    first = threading.Event()
1465
    prev = first
1466

    
1467
    # Use a deterministic random generator
1468
    for i in random.Random(4263).sample(range(100), 33):
1469
      name = "MtTestLock%s" % i
1470
      expnames.append(name)
1471

    
1472
      ev = threading.Event()
1473
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1474
      prev = ev
1475

    
1476
    # Add locks
1477
    first.set()
1478
    self._waitThreads()
1479

    
1480
    # Check order in which locks were added
1481
    self.assertEqual([i.name for i in locks], expnames)
1482

    
1483
    # Sync queries are not supported
1484
    self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
1485

    
1486
    # Check query result
1487
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1488
                     [[name, None, None] for name in utils.NiceSort(expnames)])
1489

    
1490
    # Test exclusive acquire
1491
    for tlock in locks[::4]:
1492
      tlock.acquire(shared=0)
1493
      try:
1494
        def _GetExpResult(name):
1495
          if tlock.name == name:
1496
            return [name, "exclusive", [threading.currentThread().getName()]]
1497
          return [name, None, None]
1498

    
1499
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1500
                         [_GetExpResult(name)
1501
                          for name in utils.NiceSort(expnames)])
1502
      finally:
1503
        tlock.release()
1504

    
1505
    # Test shared acquire
1506
    def _Acquire(lock, shared, ev):
1507
      lock.acquire(shared=shared)
1508
      try:
1509
        ev.wait()
1510
      finally:
1511
        lock.release()
1512

    
1513
    for tlock1 in locks[::11]:
1514
      for tlock2 in locks[::-15]:
1515
        if tlock2 == tlock1:
1516
          continue
1517

    
1518
        for tlock3 in locks[::10]:
1519
          if tlock3 == tlock2:
1520
            continue
1521

    
1522
          ev = threading.Event()
1523

    
1524
          # Acquire locks
1525
          tthreads1 = []
1526
          for i in range(3):
1527
            tthreads1.append(self._addThread(target=_Acquire,
1528
                                             args=(tlock1, 1, ev)))
1529
          tthread2 = self._addThread(target=_Acquire, args=(tlock2, 1, ev))
1530
          tthread3 = self._addThread(target=_Acquire, args=(tlock3, 0, ev))
1531

    
1532
          # Check query result
1533
          for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
1534
                                                         "owner"], False):
1535
            if name == tlock1.name:
1536
              self.assertEqual(mode, "shared")
1537
              self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
1538
              continue
1539

    
1540
            if name == tlock2.name:
1541
              self.assertEqual(mode, "shared")
1542
              self.assertEqual(owner, [tthread2.getName()])
1543
              continue
1544

    
1545
            if name == tlock3.name:
1546
              self.assertEqual(mode, "exclusive")
1547
              self.assertEqual(owner, [tthread3.getName()])
1548
              continue
1549

    
1550
            self.assert_(name in expnames)
1551
            self.assert_(mode is None)
1552
            self.assert_(owner is None)
1553

    
1554
          # Release locks again
1555
          ev.set()
1556

    
1557
          self._waitThreads()
1558

    
1559
          self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1560
                           [[name, None, None]
1561
                            for name in utils.NiceSort(expnames)])
1562

    
1563
  def testDelete(self):
1564
    lock = locking.SharedLock("TestLock", monitor=self.lm)
1565

    
1566
    self.assertEqual(len(self.lm._locks), 1)
1567
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1568
                     [[lock.name, None, None]])
1569

    
1570
    lock.delete()
1571

    
1572
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1573
                     [[lock.name, "deleted", None]])
1574
    self.assertEqual(len(self.lm._locks), 1)
1575

    
1576

    
1577
if __name__ == '__main__':
1578
  testutils.GanetiTestProgram()