Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 6760e4ed

History | View | Annotate | Download (54.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31

    
32
from ganeti import locking
33
from ganeti import errors
34
from ganeti import utils
35

    
36
import testutils
37

    
38

    
39
# This is used to test the ssynchronize decorator.
40
# Since it's passed as input to a decorator it must be declared as a global.
41
_decoratorlock = locking.SharedLock("decorator lock")
42

    
43
#: List for looping tests
44
ITERATIONS = range(8)
45

    
46

    
47
def _Repeat(fn):
48
  """Decorator for executing a function many times"""
49
  def wrapper(*args, **kwargs):
50
    for i in ITERATIONS:
51
      fn(*args, **kwargs)
52
  return wrapper
53

    
54

    
55
def SafeSleep(duration):
56
  start = time.time()
57
  while True:
58
    delay = start + duration - time.time()
59
    if delay <= 0.0:
60
      break
61
    time.sleep(delay)
62

    
63

    
64
class _ThreadedTestCase(unittest.TestCase):
65
  """Test class that supports adding/waiting on threads"""
66
  def setUp(self):
67
    unittest.TestCase.setUp(self)
68
    self.done = Queue.Queue(0)
69
    self.threads = []
70

    
71
  def _addThread(self, *args, **kwargs):
72
    """Create and remember a new thread"""
73
    t = threading.Thread(*args, **kwargs)
74
    self.threads.append(t)
75
    t.start()
76
    return t
77

    
78
  def _waitThreads(self):
79
    """Wait for all our threads to finish"""
80
    for t in self.threads:
81
      t.join(60)
82
      self.failIf(t.isAlive())
83
    self.threads = []
84

    
85

    
86
class _ConditionTestCase(_ThreadedTestCase):
87
  """Common test case for conditions"""
88

    
89
  def setUp(self, cls):
90
    _ThreadedTestCase.setUp(self)
91
    self.lock = threading.Lock()
92
    self.cond = cls(self.lock)
93

    
94
  def _testAcquireRelease(self):
95
    self.assertFalse(self.cond._is_owned())
96
    self.assertRaises(RuntimeError, self.cond.wait)
97
    self.assertRaises(RuntimeError, self.cond.notifyAll)
98

    
99
    self.cond.acquire()
100
    self.assert_(self.cond._is_owned())
101
    self.cond.notifyAll()
102
    self.assert_(self.cond._is_owned())
103
    self.cond.release()
104

    
105
    self.assertFalse(self.cond._is_owned())
106
    self.assertRaises(RuntimeError, self.cond.wait)
107
    self.assertRaises(RuntimeError, self.cond.notifyAll)
108

    
109
  def _testNotification(self):
110
    def _NotifyAll():
111
      self.done.put("NE")
112
      self.cond.acquire()
113
      self.done.put("NA")
114
      self.cond.notifyAll()
115
      self.done.put("NN")
116
      self.cond.release()
117

    
118
    self.cond.acquire()
119
    self._addThread(target=_NotifyAll)
120
    self.assertEqual(self.done.get(True, 1), "NE")
121
    self.assertRaises(Queue.Empty, self.done.get_nowait)
122
    self.cond.wait()
123
    self.assertEqual(self.done.get(True, 1), "NA")
124
    self.assertEqual(self.done.get(True, 1), "NN")
125
    self.assert_(self.cond._is_owned())
126
    self.cond.release()
127
    self.assertFalse(self.cond._is_owned())
128

    
129

    
130
class TestSingleNotifyPipeCondition(_ConditionTestCase):
131
  """SingleNotifyPipeCondition tests"""
132

    
133
  def setUp(self):
134
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
135

    
136
  def testAcquireRelease(self):
137
    self._testAcquireRelease()
138

    
139
  def testNotification(self):
140
    self._testNotification()
141

    
142
  def testWaitReuse(self):
143
    self.cond.acquire()
144
    self.cond.wait(0)
145
    self.cond.wait(0.1)
146
    self.cond.release()
147

    
148
  def testNoNotifyReuse(self):
149
    self.cond.acquire()
150
    self.cond.notifyAll()
151
    self.assertRaises(RuntimeError, self.cond.wait)
152
    self.assertRaises(RuntimeError, self.cond.notifyAll)
153
    self.cond.release()
154

    
155

    
156
class TestPipeCondition(_ConditionTestCase):
157
  """PipeCondition tests"""
158

    
159
  def setUp(self):
160
    _ConditionTestCase.setUp(self, locking.PipeCondition)
161

    
162
  def testAcquireRelease(self):
163
    self._testAcquireRelease()
164

    
165
  def testNotification(self):
166
    self._testNotification()
167

    
168
  def _TestWait(self, fn):
169
    threads = [
170
      self._addThread(target=fn),
171
      self._addThread(target=fn),
172
      self._addThread(target=fn),
173
      ]
174

    
175
    # Wait for threads to be waiting
176
    for _ in threads:
177
      self.assertEqual(self.done.get(True, 1), "A")
178

    
179
    self.assertRaises(Queue.Empty, self.done.get_nowait)
180

    
181
    self.cond.acquire()
182
    self.assertEqual(len(self.cond._waiters), 3)
183
    self.assertEqual(self.cond._waiters, set(threads))
184
    # This new thread can't acquire the lock, and thus call wait, before we
185
    # release it
186
    self._addThread(target=fn)
187
    self.cond.notifyAll()
188
    self.assertRaises(Queue.Empty, self.done.get_nowait)
189
    self.cond.release()
190

    
191
    # We should now get 3 W and 1 A (for the new thread) in whatever order
192
    w = 0
193
    a = 0
194
    for i in range(4):
195
      got = self.done.get(True, 1)
196
      if got == "W":
197
        w += 1
198
      elif got == "A":
199
        a += 1
200
      else:
201
        self.fail("Got %s on the done queue" % got)
202

    
203
    self.assertEqual(w, 3)
204
    self.assertEqual(a, 1)
205

    
206
    self.cond.acquire()
207
    self.cond.notifyAll()
208
    self.cond.release()
209
    self._waitThreads()
210
    self.assertEqual(self.done.get_nowait(), "W")
211
    self.assertRaises(Queue.Empty, self.done.get_nowait)
212

    
213
  def testBlockingWait(self):
214
    def _BlockingWait():
215
      self.cond.acquire()
216
      self.done.put("A")
217
      self.cond.wait()
218
      self.cond.release()
219
      self.done.put("W")
220

    
221
    self._TestWait(_BlockingWait)
222

    
223
  def testLongTimeoutWait(self):
224
    def _Helper():
225
      self.cond.acquire()
226
      self.done.put("A")
227
      self.cond.wait(15.0)
228
      self.cond.release()
229
      self.done.put("W")
230

    
231
    self._TestWait(_Helper)
232

    
233
  def _TimeoutWait(self, timeout, check):
234
    self.cond.acquire()
235
    self.cond.wait(timeout)
236
    self.cond.release()
237
    self.done.put(check)
238

    
239
  def testShortTimeoutWait(self):
240
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
241
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
242
    self._waitThreads()
243
    self.assertEqual(self.done.get_nowait(), "T1")
244
    self.assertEqual(self.done.get_nowait(), "T1")
245
    self.assertRaises(Queue.Empty, self.done.get_nowait)
246

    
247
  def testZeroTimeoutWait(self):
248
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
249
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
250
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
251
    self._waitThreads()
252
    self.assertEqual(self.done.get_nowait(), "T0")
253
    self.assertEqual(self.done.get_nowait(), "T0")
254
    self.assertEqual(self.done.get_nowait(), "T0")
255
    self.assertRaises(Queue.Empty, self.done.get_nowait)
256

    
257

    
258
class TestSharedLock(_ThreadedTestCase):
259
  """SharedLock tests"""
260

    
261
  def setUp(self):
262
    _ThreadedTestCase.setUp(self)
263
    self.sl = locking.SharedLock("TestSharedLock")
264

    
265
  def testSequenceAndOwnership(self):
266
    self.assertFalse(self.sl._is_owned())
267
    self.sl.acquire(shared=1)
268
    self.assert_(self.sl._is_owned())
269
    self.assert_(self.sl._is_owned(shared=1))
270
    self.assertFalse(self.sl._is_owned(shared=0))
271
    self.sl.release()
272
    self.assertFalse(self.sl._is_owned())
273
    self.sl.acquire()
274
    self.assert_(self.sl._is_owned())
275
    self.assertFalse(self.sl._is_owned(shared=1))
276
    self.assert_(self.sl._is_owned(shared=0))
277
    self.sl.release()
278
    self.assertFalse(self.sl._is_owned())
279
    self.sl.acquire(shared=1)
280
    self.assert_(self.sl._is_owned())
281
    self.assert_(self.sl._is_owned(shared=1))
282
    self.assertFalse(self.sl._is_owned(shared=0))
283
    self.sl.release()
284
    self.assertFalse(self.sl._is_owned())
285

    
286
  def testBooleanValue(self):
287
    # semaphores are supposed to return a true value on a successful acquire
288
    self.assert_(self.sl.acquire(shared=1))
289
    self.sl.release()
290
    self.assert_(self.sl.acquire())
291
    self.sl.release()
292

    
293
  def testDoubleLockingStoE(self):
294
    self.sl.acquire(shared=1)
295
    self.assertRaises(AssertionError, self.sl.acquire)
296

    
297
  def testDoubleLockingEtoS(self):
298
    self.sl.acquire()
299
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
300

    
301
  def testDoubleLockingStoS(self):
302
    self.sl.acquire(shared=1)
303
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
304

    
305
  def testDoubleLockingEtoE(self):
306
    self.sl.acquire()
307
    self.assertRaises(AssertionError, self.sl.acquire)
308

    
309
  # helper functions: called in a separate thread they acquire the lock, send
310
  # their identifier on the done queue, then release it.
311
  def _doItSharer(self):
312
    try:
313
      self.sl.acquire(shared=1)
314
      self.done.put('SHR')
315
      self.sl.release()
316
    except errors.LockError:
317
      self.done.put('ERR')
318

    
319
  def _doItExclusive(self):
320
    try:
321
      self.sl.acquire()
322
      self.done.put('EXC')
323
      self.sl.release()
324
    except errors.LockError:
325
      self.done.put('ERR')
326

    
327
  def _doItDelete(self):
328
    try:
329
      self.sl.delete()
330
      self.done.put('DEL')
331
    except errors.LockError:
332
      self.done.put('ERR')
333

    
334
  def testSharersCanCoexist(self):
335
    self.sl.acquire(shared=1)
336
    threading.Thread(target=self._doItSharer).start()
337
    self.assert_(self.done.get(True, 1))
338
    self.sl.release()
339

    
340
  @_Repeat
341
  def testExclusiveBlocksExclusive(self):
342
    self.sl.acquire()
343
    self._addThread(target=self._doItExclusive)
344
    self.assertRaises(Queue.Empty, self.done.get_nowait)
345
    self.sl.release()
346
    self._waitThreads()
347
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
348

    
349
  @_Repeat
350
  def testExclusiveBlocksDelete(self):
351
    self.sl.acquire()
352
    self._addThread(target=self._doItDelete)
353
    self.assertRaises(Queue.Empty, self.done.get_nowait)
354
    self.sl.release()
355
    self._waitThreads()
356
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
357
    self.sl = locking.SharedLock(self.sl.name)
358

    
359
  @_Repeat
360
  def testExclusiveBlocksSharer(self):
361
    self.sl.acquire()
362
    self._addThread(target=self._doItSharer)
363
    self.assertRaises(Queue.Empty, self.done.get_nowait)
364
    self.sl.release()
365
    self._waitThreads()
366
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
367

    
368
  @_Repeat
369
  def testSharerBlocksExclusive(self):
370
    self.sl.acquire(shared=1)
371
    self._addThread(target=self._doItExclusive)
372
    self.assertRaises(Queue.Empty, self.done.get_nowait)
373
    self.sl.release()
374
    self._waitThreads()
375
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
376

    
377
  @_Repeat
378
  def testSharerBlocksDelete(self):
379
    self.sl.acquire(shared=1)
380
    self._addThread(target=self._doItDelete)
381
    self.assertRaises(Queue.Empty, self.done.get_nowait)
382
    self.sl.release()
383
    self._waitThreads()
384
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
385
    self.sl = locking.SharedLock(self.sl.name)
386

    
387
  @_Repeat
388
  def testWaitingExclusiveBlocksSharer(self):
389
    """SKIPPED testWaitingExclusiveBlockSharer"""
390
    return
391

    
392
    self.sl.acquire(shared=1)
393
    # the lock is acquired in shared mode...
394
    self._addThread(target=self._doItExclusive)
395
    # ...but now an exclusive is waiting...
396
    self._addThread(target=self._doItSharer)
397
    # ...so the sharer should be blocked as well
398
    self.assertRaises(Queue.Empty, self.done.get_nowait)
399
    self.sl.release()
400
    self._waitThreads()
401
    # The exclusive passed before
402
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
403
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
404

    
405
  @_Repeat
406
  def testWaitingSharerBlocksExclusive(self):
407
    """SKIPPED testWaitingSharerBlocksExclusive"""
408
    return
409

    
410
    self.sl.acquire()
411
    # the lock is acquired in exclusive mode...
412
    self._addThread(target=self._doItSharer)
413
    # ...but now a sharer is waiting...
414
    self._addThread(target=self._doItExclusive)
415
    # ...the exclusive is waiting too...
416
    self.assertRaises(Queue.Empty, self.done.get_nowait)
417
    self.sl.release()
418
    self._waitThreads()
419
    # The sharer passed before
420
    self.assertEqual(self.done.get_nowait(), 'SHR')
421
    self.assertEqual(self.done.get_nowait(), 'EXC')
422

    
423
  def testDelete(self):
424
    self.sl.delete()
425
    self.assertRaises(errors.LockError, self.sl.acquire)
426
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
427
    self.assertRaises(errors.LockError, self.sl.delete)
428

    
429
  def testDeleteTimeout(self):
430
    self.sl.delete(timeout=60)
431

    
432
  def testNoDeleteIfSharer(self):
433
    self.sl.acquire(shared=1)
434
    self.assertRaises(AssertionError, self.sl.delete)
435

    
436
  @_Repeat
437
  def testDeletePendingSharersExclusiveDelete(self):
438
    self.sl.acquire()
439
    self._addThread(target=self._doItSharer)
440
    self._addThread(target=self._doItSharer)
441
    self._addThread(target=self._doItExclusive)
442
    self._addThread(target=self._doItDelete)
443
    self.sl.delete()
444
    self._waitThreads()
445
    # The threads who were pending return ERR
446
    for _ in range(4):
447
      self.assertEqual(self.done.get_nowait(), 'ERR')
448
    self.sl = locking.SharedLock(self.sl.name)
449

    
450
  @_Repeat
451
  def testDeletePendingDeleteExclusiveSharers(self):
452
    self.sl.acquire()
453
    self._addThread(target=self._doItDelete)
454
    self._addThread(target=self._doItExclusive)
455
    self._addThread(target=self._doItSharer)
456
    self._addThread(target=self._doItSharer)
457
    self.sl.delete()
458
    self._waitThreads()
459
    # The two threads who were pending return both ERR
460
    self.assertEqual(self.done.get_nowait(), 'ERR')
461
    self.assertEqual(self.done.get_nowait(), 'ERR')
462
    self.assertEqual(self.done.get_nowait(), 'ERR')
463
    self.assertEqual(self.done.get_nowait(), 'ERR')
464
    self.sl = locking.SharedLock(self.sl.name)
465

    
466
  @_Repeat
467
  def testExclusiveAcquireTimeout(self):
468
    for shared in [0, 1]:
469
      on_queue = threading.Event()
470
      release_exclusive = threading.Event()
471

    
472
      def _LockExclusive():
473
        self.sl.acquire(shared=0, test_notify=on_queue.set)
474
        self.done.put("A: start wait")
475
        release_exclusive.wait()
476
        self.done.put("A: end wait")
477
        self.sl.release()
478

    
479
      # Start thread to hold lock in exclusive mode
480
      self._addThread(target=_LockExclusive)
481

    
482
      # Wait for wait to begin
483
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
484

    
485
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
486
      # on the queue
487
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
488
                                      test_notify=release_exclusive.set))
489

    
490
      self.done.put("got 2nd")
491
      self.sl.release()
492

    
493
      self._waitThreads()
494

    
495
      self.assertEqual(self.done.get_nowait(), "A: end wait")
496
      self.assertEqual(self.done.get_nowait(), "got 2nd")
497
      self.assertRaises(Queue.Empty, self.done.get_nowait)
498

    
499
  @_Repeat
500
  def testAcquireExpiringTimeout(self):
501
    def _AcquireWithTimeout(shared, timeout):
502
      if not self.sl.acquire(shared=shared, timeout=timeout):
503
        self.done.put("timeout")
504

    
505
    for shared in [0, 1]:
506
      # Lock exclusively
507
      self.sl.acquire()
508

    
509
      # Start shared acquires with timeout between 0 and 20 ms
510
      for i in range(11):
511
        self._addThread(target=_AcquireWithTimeout,
512
                        args=(shared, i * 2.0 / 1000.0))
513

    
514
      # Wait for threads to finish (makes sure the acquire timeout expires
515
      # before releasing the lock)
516
      self._waitThreads()
517

    
518
      # Release lock
519
      self.sl.release()
520

    
521
      for _ in range(11):
522
        self.assertEqual(self.done.get_nowait(), "timeout")
523

    
524
      self.assertRaises(Queue.Empty, self.done.get_nowait)
525

    
526
  @_Repeat
527
  def testSharedSkipExclusiveAcquires(self):
528
    # Tests whether shared acquires jump in front of exclusive acquires in the
529
    # queue.
530

    
531
    def _Acquire(shared, name, notify_ev, wait_ev):
532
      if notify_ev:
533
        notify_fn = notify_ev.set
534
      else:
535
        notify_fn = None
536

    
537
      if wait_ev:
538
        wait_ev.wait()
539

    
540
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
541
        return
542

    
543
      self.done.put(name)
544
      self.sl.release()
545

    
546
    # Get exclusive lock while we fill the queue
547
    self.sl.acquire()
548

    
549
    shrcnt1 = 5
550
    shrcnt2 = 7
551
    shrcnt3 = 9
552
    shrcnt4 = 2
553

    
554
    # Add acquires using threading.Event for synchronization. They'll be
555
    # acquired exactly in the order defined in this list.
556
    acquires = (shrcnt1 * [(1, "shared 1")] +
557
                3 * [(0, "exclusive 1")] +
558
                shrcnt2 * [(1, "shared 2")] +
559
                shrcnt3 * [(1, "shared 3")] +
560
                shrcnt4 * [(1, "shared 4")] +
561
                3 * [(0, "exclusive 2")])
562

    
563
    ev_cur = None
564
    ev_prev = None
565

    
566
    for args in acquires:
567
      ev_cur = threading.Event()
568
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
569
      ev_prev = ev_cur
570

    
571
    # Wait for last acquire to start
572
    ev_prev.wait()
573

    
574
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
575
    # together
576
    self.assertEqual(self.sl._count_pending(), 7)
577

    
578
    # Release exclusive lock and wait
579
    self.sl.release()
580

    
581
    self._waitThreads()
582

    
583
    # Check sequence
584
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
585
      # Shared locks aren't guaranteed to be notified in order, but they'll be
586
      # first
587
      tmp = self.done.get_nowait()
588
      if tmp == "shared 1":
589
        shrcnt1 -= 1
590
      elif tmp == "shared 2":
591
        shrcnt2 -= 1
592
      elif tmp == "shared 3":
593
        shrcnt3 -= 1
594
      elif tmp == "shared 4":
595
        shrcnt4 -= 1
596
    self.assertEqual(shrcnt1, 0)
597
    self.assertEqual(shrcnt2, 0)
598
    self.assertEqual(shrcnt3, 0)
599
    self.assertEqual(shrcnt3, 0)
600

    
601
    for _ in range(3):
602
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
603

    
604
    for _ in range(3):
605
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
606

    
607
    self.assertRaises(Queue.Empty, self.done.get_nowait)
608

    
609
  @_Repeat
610
  def testMixedAcquireTimeout(self):
611
    sync = threading.Event()
612

    
613
    def _AcquireShared(ev):
614
      if not self.sl.acquire(shared=1, timeout=None):
615
        return
616

    
617
      self.done.put("shared")
618

    
619
      # Notify main thread
620
      ev.set()
621

    
622
      # Wait for notification from main thread
623
      sync.wait()
624

    
625
      # Release lock
626
      self.sl.release()
627

    
628
    acquires = []
629
    for _ in range(3):
630
      ev = threading.Event()
631
      self._addThread(target=_AcquireShared, args=(ev, ))
632
      acquires.append(ev)
633

    
634
    # Wait for all acquires to finish
635
    for i in acquires:
636
      i.wait()
637

    
638
    self.assertEqual(self.sl._count_pending(), 0)
639

    
640
    # Try to get exclusive lock
641
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
642

    
643
    # Acquire exclusive without timeout
644
    exclsync = threading.Event()
645
    exclev = threading.Event()
646

    
647
    def _AcquireExclusive():
648
      if not self.sl.acquire(shared=0):
649
        return
650

    
651
      self.done.put("exclusive")
652

    
653
      # Notify main thread
654
      exclev.set()
655

    
656
      # Wait for notification from main thread
657
      exclsync.wait()
658

    
659
      self.sl.release()
660

    
661
    self._addThread(target=_AcquireExclusive)
662

    
663
    # Try to get exclusive lock
664
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
665

    
666
    # Make all shared holders release their locks
667
    sync.set()
668

    
669
    # Wait for exclusive acquire to succeed
670
    exclev.wait()
671

    
672
    self.assertEqual(self.sl._count_pending(), 0)
673

    
674
    # Try to get exclusive lock
675
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
676

    
677
    def _AcquireSharedSimple():
678
      if self.sl.acquire(shared=1, timeout=None):
679
        self.done.put("shared2")
680
        self.sl.release()
681

    
682
    for _ in range(10):
683
      self._addThread(target=_AcquireSharedSimple)
684

    
685
    # Tell exclusive lock to release
686
    exclsync.set()
687

    
688
    # Wait for everything to finish
689
    self._waitThreads()
690

    
691
    self.assertEqual(self.sl._count_pending(), 0)
692

    
693
    # Check sequence
694
    for _ in range(3):
695
      self.assertEqual(self.done.get_nowait(), "shared")
696

    
697
    self.assertEqual(self.done.get_nowait(), "exclusive")
698

    
699
    for _ in range(10):
700
      self.assertEqual(self.done.get_nowait(), "shared2")
701

    
702
    self.assertRaises(Queue.Empty, self.done.get_nowait)
703

    
704

    
705
class TestSharedLockInCondition(_ThreadedTestCase):
706
  """SharedLock as a condition lock tests"""
707

    
708
  def setUp(self):
709
    _ThreadedTestCase.setUp(self)
710
    self.sl = locking.SharedLock("TestSharedLockInCondition")
711
    self.setCondition()
712

    
713
  def setCondition(self):
714
    self.cond = threading.Condition(self.sl)
715

    
716
  def testKeepMode(self):
717
    self.cond.acquire(shared=1)
718
    self.assert_(self.sl._is_owned(shared=1))
719
    self.cond.wait(0)
720
    self.assert_(self.sl._is_owned(shared=1))
721
    self.cond.release()
722
    self.cond.acquire(shared=0)
723
    self.assert_(self.sl._is_owned(shared=0))
724
    self.cond.wait(0)
725
    self.assert_(self.sl._is_owned(shared=0))
726
    self.cond.release()
727

    
728

    
729
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
730
  """SharedLock as a pipe condition lock tests"""
731

    
732
  def setCondition(self):
733
    self.cond = locking.PipeCondition(self.sl)
734

    
735

    
736
class TestSSynchronizedDecorator(_ThreadedTestCase):
737
  """Shared Lock Synchronized decorator test"""
738

    
739
  def setUp(self):
740
    _ThreadedTestCase.setUp(self)
741

    
742
  @locking.ssynchronized(_decoratorlock)
743
  def _doItExclusive(self):
744
    self.assert_(_decoratorlock._is_owned())
745
    self.done.put('EXC')
746

    
747
  @locking.ssynchronized(_decoratorlock, shared=1)
748
  def _doItSharer(self):
749
    self.assert_(_decoratorlock._is_owned(shared=1))
750
    self.done.put('SHR')
751

    
752
  def testDecoratedFunctions(self):
753
    self._doItExclusive()
754
    self.assertFalse(_decoratorlock._is_owned())
755
    self._doItSharer()
756
    self.assertFalse(_decoratorlock._is_owned())
757

    
758
  def testSharersCanCoexist(self):
759
    _decoratorlock.acquire(shared=1)
760
    threading.Thread(target=self._doItSharer).start()
761
    self.assert_(self.done.get(True, 1))
762
    _decoratorlock.release()
763

    
764
  @_Repeat
765
  def testExclusiveBlocksExclusive(self):
766
    _decoratorlock.acquire()
767
    self._addThread(target=self._doItExclusive)
768
    # give it a bit of time to check that it's not actually doing anything
769
    self.assertRaises(Queue.Empty, self.done.get_nowait)
770
    _decoratorlock.release()
771
    self._waitThreads()
772
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
773

    
774
  @_Repeat
775
  def testExclusiveBlocksSharer(self):
776
    _decoratorlock.acquire()
777
    self._addThread(target=self._doItSharer)
778
    self.assertRaises(Queue.Empty, self.done.get_nowait)
779
    _decoratorlock.release()
780
    self._waitThreads()
781
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
782

    
783
  @_Repeat
784
  def testSharerBlocksExclusive(self):
785
    _decoratorlock.acquire(shared=1)
786
    self._addThread(target=self._doItExclusive)
787
    self.assertRaises(Queue.Empty, self.done.get_nowait)
788
    _decoratorlock.release()
789
    self._waitThreads()
790
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
791

    
792

    
793
class TestLockSet(_ThreadedTestCase):
794
  """LockSet tests"""
795

    
796
  def setUp(self):
797
    _ThreadedTestCase.setUp(self)
798
    self._setUpLS()
799

    
800
  def _setUpLS(self):
801
    """Helper to (re)initialize the lock set"""
802
    self.resources = ['one', 'two', 'three']
803
    self.ls = locking.LockSet(self.resources, "TestLockSet")
804

    
805
  def testResources(self):
806
    self.assertEquals(self.ls._names(), set(self.resources))
807
    newls = locking.LockSet([], "TestLockSet.testResources")
808
    self.assertEquals(newls._names(), set())
809

    
810
  def testAcquireRelease(self):
811
    self.assert_(self.ls.acquire('one'))
812
    self.assertEquals(self.ls._list_owned(), set(['one']))
813
    self.ls.release()
814
    self.assertEquals(self.ls._list_owned(), set())
815
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
816
    self.assertEquals(self.ls._list_owned(), set(['one']))
817
    self.ls.release()
818
    self.assertEquals(self.ls._list_owned(), set())
819
    self.ls.acquire(['one', 'two', 'three'])
820
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
821
    self.ls.release('one')
822
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
823
    self.ls.release(['three'])
824
    self.assertEquals(self.ls._list_owned(), set(['two']))
825
    self.ls.release()
826
    self.assertEquals(self.ls._list_owned(), set())
827
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
828
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
829
    self.ls.release()
830
    self.assertEquals(self.ls._list_owned(), set())
831

    
832
  def testNoDoubleAcquire(self):
833
    self.ls.acquire('one')
834
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
835
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
836
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
837
    self.ls.release()
838
    self.ls.acquire(['one', 'three'])
839
    self.ls.release('one')
840
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
841
    self.ls.release('three')
842

    
843
  def testNoWrongRelease(self):
844
    self.assertRaises(AssertionError, self.ls.release)
845
    self.ls.acquire('one')
846
    self.assertRaises(AssertionError, self.ls.release, 'two')
847

    
848
  def testAddRemove(self):
849
    self.ls.add('four')
850
    self.assertEquals(self.ls._list_owned(), set())
851
    self.assert_('four' in self.ls._names())
852
    self.ls.add(['five', 'six', 'seven'], acquired=1)
853
    self.assert_('five' in self.ls._names())
854
    self.assert_('six' in self.ls._names())
855
    self.assert_('seven' in self.ls._names())
856
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
857
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
858
    self.assert_('five' not in self.ls._names())
859
    self.assert_('six' not in self.ls._names())
860
    self.assertEquals(self.ls._list_owned(), set(['seven']))
861
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
862
    self.ls.remove('seven')
863
    self.assert_('seven' not in self.ls._names())
864
    self.assertEquals(self.ls._list_owned(), set([]))
865
    self.ls.acquire(None, shared=1)
866
    self.assertRaises(AssertionError, self.ls.add, 'eight')
867
    self.ls.release()
868
    self.ls.acquire(None)
869
    self.ls.add('eight', acquired=1)
870
    self.assert_('eight' in self.ls._names())
871
    self.assert_('eight' in self.ls._list_owned())
872
    self.ls.add('nine')
873
    self.assert_('nine' in self.ls._names())
874
    self.assert_('nine' not in self.ls._list_owned())
875
    self.ls.release()
876
    self.ls.remove(['two'])
877
    self.assert_('two' not in self.ls._names())
878
    self.ls.acquire('three')
879
    self.assertEquals(self.ls.remove(['three']), ['three'])
880
    self.assert_('three' not in self.ls._names())
881
    self.assertEquals(self.ls.remove('three'), [])
882
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
883
    self.assert_('one' not in self.ls._names())
884

    
885
  def testRemoveNonBlocking(self):
886
    self.ls.acquire('one')
887
    self.assertEquals(self.ls.remove('one'), ['one'])
888
    self.ls.acquire(['two', 'three'])
889
    self.assertEquals(self.ls.remove(['two', 'three']),
890
                      ['two', 'three'])
891

    
892
  def testNoDoubleAdd(self):
893
    self.assertRaises(errors.LockError, self.ls.add, 'two')
894
    self.ls.add('four')
895
    self.assertRaises(errors.LockError, self.ls.add, 'four')
896

    
897
  def testNoWrongRemoves(self):
898
    self.ls.acquire(['one', 'three'], shared=1)
899
    # Cannot remove 'two' while holding something which is not a superset
900
    self.assertRaises(AssertionError, self.ls.remove, 'two')
901
    # Cannot remove 'three' as we are sharing it
902
    self.assertRaises(AssertionError, self.ls.remove, 'three')
903

    
904
  def testAcquireSetLock(self):
905
    # acquire the set-lock exclusively
906
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
907
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
908
    self.assertEquals(self.ls._is_owned(), True)
909
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
910
    # I can still add/remove elements...
911
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
912
    self.assert_(self.ls.add('six'))
913
    self.ls.release()
914
    # share the set-lock
915
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
916
    # adding new elements is not possible
917
    self.assertRaises(AssertionError, self.ls.add, 'five')
918
    self.ls.release()
919

    
920
  def testAcquireWithRepetitions(self):
921
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
922
                      set(['two', 'two', 'three']))
923
    self.ls.release(['two', 'two'])
924
    self.assertEquals(self.ls._list_owned(), set(['three']))
925

    
926
  def testEmptyAcquire(self):
927
    # Acquire an empty list of locks...
928
    self.assertEquals(self.ls.acquire([]), set())
929
    self.assertEquals(self.ls._list_owned(), set())
930
    # New locks can still be addded
931
    self.assert_(self.ls.add('six'))
932
    # "re-acquiring" is not an issue, since we had really acquired nothing
933
    self.assertEquals(self.ls.acquire([], shared=1), set())
934
    self.assertEquals(self.ls._list_owned(), set())
935
    # We haven't really acquired anything, so we cannot release
936
    self.assertRaises(AssertionError, self.ls.release)
937

    
938
  def _doLockSet(self, names, shared):
939
    try:
940
      self.ls.acquire(names, shared=shared)
941
      self.done.put('DONE')
942
      self.ls.release()
943
    except errors.LockError:
944
      self.done.put('ERR')
945

    
946
  def _doAddSet(self, names):
947
    try:
948
      self.ls.add(names, acquired=1)
949
      self.done.put('DONE')
950
      self.ls.release()
951
    except errors.LockError:
952
      self.done.put('ERR')
953

    
954
  def _doRemoveSet(self, names):
955
    self.done.put(self.ls.remove(names))
956

    
957
  @_Repeat
958
  def testConcurrentSharedAcquire(self):
959
    self.ls.acquire(['one', 'two'], shared=1)
960
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
961
    self._waitThreads()
962
    self.assertEqual(self.done.get_nowait(), 'DONE')
963
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
964
    self._waitThreads()
965
    self.assertEqual(self.done.get_nowait(), 'DONE')
966
    self._addThread(target=self._doLockSet, args=('three', 1))
967
    self._waitThreads()
968
    self.assertEqual(self.done.get_nowait(), 'DONE')
969
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
970
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
971
    self.assertRaises(Queue.Empty, self.done.get_nowait)
972
    self.ls.release()
973
    self._waitThreads()
974
    self.assertEqual(self.done.get_nowait(), 'DONE')
975
    self.assertEqual(self.done.get_nowait(), 'DONE')
976

    
977
  @_Repeat
978
  def testConcurrentExclusiveAcquire(self):
979
    self.ls.acquire(['one', 'two'])
980
    self._addThread(target=self._doLockSet, args=('three', 1))
981
    self._waitThreads()
982
    self.assertEqual(self.done.get_nowait(), 'DONE')
983
    self._addThread(target=self._doLockSet, args=('three', 0))
984
    self._waitThreads()
985
    self.assertEqual(self.done.get_nowait(), 'DONE')
986
    self.assertRaises(Queue.Empty, self.done.get_nowait)
987
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
988
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
989
    self._addThread(target=self._doLockSet, args=('one', 0))
990
    self._addThread(target=self._doLockSet, args=('one', 1))
991
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
992
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
993
    self.assertRaises(Queue.Empty, self.done.get_nowait)
994
    self.ls.release()
995
    self._waitThreads()
996
    for _ in range(6):
997
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
998

    
999
  @_Repeat
1000
  def testSimpleAcquireTimeoutExpiring(self):
1001
    names = sorted(self.ls._names())
1002
    self.assert_(len(names) >= 3)
1003

    
1004
    # Get name of first lock
1005
    first = names[0]
1006

    
1007
    # Get name of last lock
1008
    last = names.pop()
1009

    
1010
    checks = [
1011
      # Block first and try to lock it again
1012
      (first, first),
1013

    
1014
      # Block last and try to lock all locks
1015
      (None, first),
1016

    
1017
      # Block last and try to lock it again
1018
      (last, last),
1019
      ]
1020

    
1021
    for (wanted, block) in checks:
1022
      # Lock in exclusive mode
1023
      self.assert_(self.ls.acquire(block, shared=0))
1024

    
1025
      def _AcquireOne():
1026
        # Try to get the same lock again with a timeout (should never succeed)
1027
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1028
        if acquired:
1029
          self.done.put("acquired")
1030
          self.ls.release()
1031
        else:
1032
          self.assert_(acquired is None)
1033
          self.assertFalse(self.ls._list_owned())
1034
          self.assertFalse(self.ls._is_owned())
1035
          self.done.put("not acquired")
1036

    
1037
      self._addThread(target=_AcquireOne)
1038

    
1039
      # Wait for timeout in thread to expire
1040
      self._waitThreads()
1041

    
1042
      # Release exclusive lock again
1043
      self.ls.release()
1044

    
1045
      self.assertEqual(self.done.get_nowait(), "not acquired")
1046
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1047

    
1048
  @_Repeat
1049
  def testDelayedAndExpiringLockAcquire(self):
1050
    self._setUpLS()
1051
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1052

    
1053
    for expire in (False, True):
1054
      names = sorted(self.ls._names())
1055
      self.assertEqual(len(names), 8)
1056

    
1057
      lock_ev = dict([(i, threading.Event()) for i in names])
1058

    
1059
      # Lock all in exclusive mode
1060
      self.assert_(self.ls.acquire(names, shared=0))
1061

    
1062
      if expire:
1063
        # We'll wait at least 300ms per lock
1064
        lockwait = len(names) * [0.3]
1065

    
1066
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1067
        # this gives us up to 2.4s to fail.
1068
        lockall_timeout = 0.4
1069
      else:
1070
        # This should finish rather quickly
1071
        lockwait = None
1072
        lockall_timeout = len(names) * 5.0
1073

    
1074
      def _LockAll():
1075
        def acquire_notification(name):
1076
          if not expire:
1077
            self.done.put("getting %s" % name)
1078

    
1079
          # Kick next lock
1080
          lock_ev[name].set()
1081

    
1082
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1083
                           test_notify=acquire_notification):
1084
          self.done.put("got all")
1085
          self.ls.release()
1086
        else:
1087
          self.done.put("timeout on all")
1088

    
1089
        # Notify all locks
1090
        for ev in lock_ev.values():
1091
          ev.set()
1092

    
1093
      t = self._addThread(target=_LockAll)
1094

    
1095
      for idx, name in enumerate(names):
1096
        # Wait for actual acquire on this lock to start
1097
        lock_ev[name].wait(10.0)
1098

    
1099
        if expire and t.isAlive():
1100
          # Wait some time after getting the notification to make sure the lock
1101
          # acquire will expire
1102
          SafeSleep(lockwait[idx])
1103

    
1104
        self.ls.release(names=name)
1105

    
1106
      self.assertFalse(self.ls._list_owned())
1107

    
1108
      self._waitThreads()
1109

    
1110
      if expire:
1111
        # Not checking which locks were actually acquired. Doing so would be
1112
        # too timing-dependant.
1113
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1114
      else:
1115
        for i in names:
1116
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1117
        self.assertEqual(self.done.get_nowait(), "got all")
1118
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1119

    
1120
  @_Repeat
1121
  def testConcurrentRemove(self):
1122
    self.ls.add('four')
1123
    self.ls.acquire(['one', 'two', 'four'])
1124
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1125
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1126
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1127
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1128
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1129
    self.ls.remove('one')
1130
    self.ls.release()
1131
    self._waitThreads()
1132
    for i in range(4):
1133
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1134
    self.ls.add(['five', 'six'], acquired=1)
1135
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1136
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1137
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1138
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1139
    self.ls.remove('five')
1140
    self.ls.release()
1141
    self._waitThreads()
1142
    for i in range(4):
1143
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1144
    self.ls.acquire(['three', 'four'])
1145
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1146
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1147
    self.ls.remove('four')
1148
    self._waitThreads()
1149
    self.assertEqual(self.done.get_nowait(), ['six'])
1150
    self._addThread(target=self._doRemoveSet, args=(['two']))
1151
    self._waitThreads()
1152
    self.assertEqual(self.done.get_nowait(), ['two'])
1153
    self.ls.release()
1154
    # reset lockset
1155
    self._setUpLS()
1156

    
1157
  @_Repeat
1158
  def testConcurrentSharedSetLock(self):
1159
    # share the set-lock...
1160
    self.ls.acquire(None, shared=1)
1161
    # ...another thread can share it too
1162
    self._addThread(target=self._doLockSet, args=(None, 1))
1163
    self._waitThreads()
1164
    self.assertEqual(self.done.get_nowait(), 'DONE')
1165
    # ...or just share some elements
1166
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1167
    self._waitThreads()
1168
    self.assertEqual(self.done.get_nowait(), 'DONE')
1169
    # ...but not add new ones or remove any
1170
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1171
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1172
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1173
    # this just releases the set-lock
1174
    self.ls.release([])
1175
    t.join(60)
1176
    self.assertEqual(self.done.get_nowait(), 'DONE')
1177
    # release the lock on the actual elements so remove() can proceed too
1178
    self.ls.release()
1179
    self._waitThreads()
1180
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1181
    # reset lockset
1182
    self._setUpLS()
1183

    
1184
  @_Repeat
1185
  def testConcurrentExclusiveSetLock(self):
1186
    # acquire the set-lock...
1187
    self.ls.acquire(None, shared=0)
1188
    # ...no one can do anything else
1189
    self._addThread(target=self._doLockSet, args=(None, 1))
1190
    self._addThread(target=self._doLockSet, args=(None, 0))
1191
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1192
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1193
    self._addThread(target=self._doAddSet, args=(['nine']))
1194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1195
    self.ls.release()
1196
    self._waitThreads()
1197
    for _ in range(5):
1198
      self.assertEqual(self.done.get(True, 1), 'DONE')
1199
    # cleanup
1200
    self._setUpLS()
1201

    
1202
  @_Repeat
1203
  def testConcurrentSetLockAdd(self):
1204
    self.ls.acquire('one')
1205
    # Another thread wants the whole SetLock
1206
    self._addThread(target=self._doLockSet, args=(None, 0))
1207
    self._addThread(target=self._doLockSet, args=(None, 1))
1208
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1209
    self.assertRaises(AssertionError, self.ls.add, 'four')
1210
    self.ls.release()
1211
    self._waitThreads()
1212
    self.assertEqual(self.done.get_nowait(), 'DONE')
1213
    self.assertEqual(self.done.get_nowait(), 'DONE')
1214
    self.ls.acquire(None)
1215
    self._addThread(target=self._doLockSet, args=(None, 0))
1216
    self._addThread(target=self._doLockSet, args=(None, 1))
1217
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1218
    self.ls.add('four')
1219
    self.ls.add('five', acquired=1)
1220
    self.ls.add('six', acquired=1, shared=1)
1221
    self.assertEquals(self.ls._list_owned(),
1222
      set(['one', 'two', 'three', 'five', 'six']))
1223
    self.assertEquals(self.ls._is_owned(), True)
1224
    self.assertEquals(self.ls._names(),
1225
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1226
    self.ls.release()
1227
    self._waitThreads()
1228
    self.assertEqual(self.done.get_nowait(), 'DONE')
1229
    self.assertEqual(self.done.get_nowait(), 'DONE')
1230
    self._setUpLS()
1231

    
1232
  @_Repeat
1233
  def testEmptyLockSet(self):
1234
    # get the set-lock
1235
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1236
    # now empty it...
1237
    self.ls.remove(['one', 'two', 'three'])
1238
    # and adds/locks by another thread still wait
1239
    self._addThread(target=self._doAddSet, args=(['nine']))
1240
    self._addThread(target=self._doLockSet, args=(None, 1))
1241
    self._addThread(target=self._doLockSet, args=(None, 0))
1242
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1243
    self.ls.release()
1244
    self._waitThreads()
1245
    for _ in range(3):
1246
      self.assertEqual(self.done.get_nowait(), 'DONE')
1247
    # empty it again...
1248
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1249
    # now share it...
1250
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1251
    # other sharers can go, adds still wait
1252
    self._addThread(target=self._doLockSet, args=(None, 1))
1253
    self._waitThreads()
1254
    self.assertEqual(self.done.get_nowait(), 'DONE')
1255
    self._addThread(target=self._doAddSet, args=(['nine']))
1256
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1257
    self.ls.release()
1258
    self._waitThreads()
1259
    self.assertEqual(self.done.get_nowait(), 'DONE')
1260
    self._setUpLS()
1261

    
1262

    
1263
class TestGanetiLockManager(_ThreadedTestCase):
1264

    
1265
  def setUp(self):
1266
    _ThreadedTestCase.setUp(self)
1267
    self.nodes=['n1', 'n2']
1268
    self.instances=['i1', 'i2', 'i3']
1269
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1270
                                        instances=self.instances)
1271

    
1272
  def tearDown(self):
1273
    # Don't try this at home...
1274
    locking.GanetiLockManager._instance = None
1275

    
1276
  def testLockingConstants(self):
1277
    # The locking library internally cheats by assuming its constants have some
1278
    # relationships with each other. Check those hold true.
1279
    # This relationship is also used in the Processor to recursively acquire
1280
    # the right locks. Again, please don't break it.
1281
    for i in range(len(locking.LEVELS)):
1282
      self.assertEqual(i, locking.LEVELS[i])
1283

    
1284
  def testDoubleGLFails(self):
1285
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1286

    
1287
  def testLockNames(self):
1288
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1289
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1290
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1291
                     set(self.instances))
1292

    
1293
  def testInitAndResources(self):
1294
    locking.GanetiLockManager._instance = None
1295
    self.GL = locking.GanetiLockManager([], [])
1296
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1297
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1298
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1299

    
1300
    locking.GanetiLockManager._instance = None
1301
    self.GL = locking.GanetiLockManager(self.nodes, [])
1302
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1303
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1304
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1305

    
1306
    locking.GanetiLockManager._instance = None
1307
    self.GL = locking.GanetiLockManager([], self.instances)
1308
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1309
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1310
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1311
                     set(self.instances))
1312

    
1313
  def testAcquireRelease(self):
1314
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1315
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1316
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1317
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1318
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1319
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1320
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1321
    self.GL.release(locking.LEVEL_NODE)
1322
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1323
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1324
    self.GL.release(locking.LEVEL_INSTANCE)
1325
    self.assertRaises(errors.LockError, self.GL.acquire,
1326
                      locking.LEVEL_INSTANCE, ['i5'])
1327
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1328
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1329

    
1330
  def testAcquireWholeSets(self):
1331
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1332
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1333
                      set(self.instances))
1334
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1335
                      set(self.instances))
1336
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1337
                      set(self.nodes))
1338
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1339
                      set(self.nodes))
1340
    self.GL.release(locking.LEVEL_NODE)
1341
    self.GL.release(locking.LEVEL_INSTANCE)
1342
    self.GL.release(locking.LEVEL_CLUSTER)
1343

    
1344
  def testAcquireWholeAndPartial(self):
1345
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1346
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1347
                      set(self.instances))
1348
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1349
                      set(self.instances))
1350
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1351
                      set(['n2']))
1352
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1353
                      set(['n2']))
1354
    self.GL.release(locking.LEVEL_NODE)
1355
    self.GL.release(locking.LEVEL_INSTANCE)
1356
    self.GL.release(locking.LEVEL_CLUSTER)
1357

    
1358
  def testBGLDependency(self):
1359
    self.assertRaises(AssertionError, self.GL.acquire,
1360
                      locking.LEVEL_NODE, ['n1', 'n2'])
1361
    self.assertRaises(AssertionError, self.GL.acquire,
1362
                      locking.LEVEL_INSTANCE, ['i3'])
1363
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1364
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1365
    self.assertRaises(AssertionError, self.GL.release,
1366
                      locking.LEVEL_CLUSTER, ['BGL'])
1367
    self.assertRaises(AssertionError, self.GL.release,
1368
                      locking.LEVEL_CLUSTER)
1369
    self.GL.release(locking.LEVEL_NODE)
1370
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1371
    self.assertRaises(AssertionError, self.GL.release,
1372
                      locking.LEVEL_CLUSTER, ['BGL'])
1373
    self.assertRaises(AssertionError, self.GL.release,
1374
                      locking.LEVEL_CLUSTER)
1375
    self.GL.release(locking.LEVEL_INSTANCE)
1376

    
1377
  def testWrongOrder(self):
1378
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1379
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1380
    self.assertRaises(AssertionError, self.GL.acquire,
1381
                      locking.LEVEL_NODE, ['n1'])
1382
    self.assertRaises(AssertionError, self.GL.acquire,
1383
                      locking.LEVEL_INSTANCE, ['i2'])
1384

    
1385
  # Helper function to run as a thread that shared the BGL and then acquires
1386
  # some locks at another level.
1387
  def _doLock(self, level, names, shared):
1388
    try:
1389
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1390
      self.GL.acquire(level, names, shared=shared)
1391
      self.done.put('DONE')
1392
      self.GL.release(level)
1393
      self.GL.release(locking.LEVEL_CLUSTER)
1394
    except errors.LockError:
1395
      self.done.put('ERR')
1396

    
1397
  @_Repeat
1398
  def testConcurrency(self):
1399
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1400
    self._addThread(target=self._doLock,
1401
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1402
    self._waitThreads()
1403
    self.assertEqual(self.done.get_nowait(), 'DONE')
1404
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1405
    self._addThread(target=self._doLock,
1406
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1407
    self._waitThreads()
1408
    self.assertEqual(self.done.get_nowait(), 'DONE')
1409
    self._addThread(target=self._doLock,
1410
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1411
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1412
    self.GL.release(locking.LEVEL_INSTANCE)
1413
    self._waitThreads()
1414
    self.assertEqual(self.done.get_nowait(), 'DONE')
1415
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1416
    self._addThread(target=self._doLock,
1417
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1418
    self._waitThreads()
1419
    self.assertEqual(self.done.get_nowait(), 'DONE')
1420
    self._addThread(target=self._doLock,
1421
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1422
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1423
    self.GL.release(locking.LEVEL_INSTANCE)
1424
    self._waitThreads()
1425
    self.assertEqual(self.done.get(True, 1), 'DONE')
1426
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1427

    
1428

    
1429
class TestLockMonitor(_ThreadedTestCase):
1430
  def setUp(self):
1431
    _ThreadedTestCase.setUp(self)
1432
    self.lm = locking.LockMonitor()
1433

    
1434
  def testSingleThread(self):
1435
    locks = []
1436

    
1437
    for i in range(100):
1438
      name = "TestLock%s" % i
1439
      locks.append(locking.SharedLock(name, monitor=self.lm))
1440

    
1441
    self.assertEqual(len(self.lm._locks), len(locks))
1442

    
1443
    self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
1444
                     100)
1445

    
1446
    # Delete all locks
1447
    del locks[:]
1448

    
1449
    # The garbage collector might needs some time
1450
    def _CheckLocks():
1451
      if self.lm._locks:
1452
        raise utils.RetryAgain()
1453

    
1454
    utils.Retry(_CheckLocks, 0.1, 30.0)
1455

    
1456
    self.assertFalse(self.lm._locks)
1457

    
1458
  def testMultiThread(self):
1459
    locks = []
1460

    
1461
    def _CreateLock(prev, next, name):
1462
      prev.wait()
1463
      locks.append(locking.SharedLock(name, monitor=self.lm))
1464
      if next:
1465
        next.set()
1466

    
1467
    expnames = []
1468

    
1469
    first = threading.Event()
1470
    prev = first
1471

    
1472
    # Use a deterministic random generator
1473
    for i in random.Random(4263).sample(range(100), 33):
1474
      name = "MtTestLock%s" % i
1475
      expnames.append(name)
1476

    
1477
      ev = threading.Event()
1478
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1479
      prev = ev
1480

    
1481
    # Add locks
1482
    first.set()
1483
    self._waitThreads()
1484

    
1485
    # Check order in which locks were added
1486
    self.assertEqual([i.name for i in locks], expnames)
1487

    
1488
    # Sync queries are not supported
1489
    self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
1490

    
1491
    # Check query result
1492
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1493
                                        False),
1494
                     [[name, None, None, []]
1495
                      for name in utils.NiceSort(expnames)])
1496

    
1497
    # Test exclusive acquire
1498
    for tlock in locks[::4]:
1499
      tlock.acquire(shared=0)
1500
      try:
1501
        def _GetExpResult(name):
1502
          if tlock.name == name:
1503
            return [name, "exclusive", [threading.currentThread().getName()],
1504
                    []]
1505
          return [name, None, None, []]
1506

    
1507
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1508
                                             "pending"], False),
1509
                         [_GetExpResult(name)
1510
                          for name in utils.NiceSort(expnames)])
1511
      finally:
1512
        tlock.release()
1513

    
1514
    # Test shared acquire
1515
    def _Acquire(lock, shared, ev, notify):
1516
      lock.acquire(shared=shared)
1517
      try:
1518
        notify.set()
1519
        ev.wait()
1520
      finally:
1521
        lock.release()
1522

    
1523
    for tlock1 in locks[::11]:
1524
      for tlock2 in locks[::-15]:
1525
        if tlock2 == tlock1:
1526
          # Avoid deadlocks
1527
          continue
1528

    
1529
        for tlock3 in locks[::10]:
1530
          if tlock3 in (tlock2, tlock1):
1531
            # Avoid deadlocks
1532
            continue
1533

    
1534
          releaseev = threading.Event()
1535

    
1536
          # Acquire locks
1537
          acquireev = []
1538
          tthreads1 = []
1539
          for i in range(3):
1540
            ev = threading.Event()
1541
            tthreads1.append(self._addThread(target=_Acquire,
1542
                                             args=(tlock1, 1, releaseev, ev)))
1543
            acquireev.append(ev)
1544

    
1545
          ev = threading.Event()
1546
          tthread2 = self._addThread(target=_Acquire,
1547
                                     args=(tlock2, 1, releaseev, ev))
1548
          acquireev.append(ev)
1549

    
1550
          ev = threading.Event()
1551
          tthread3 = self._addThread(target=_Acquire,
1552
                                     args=(tlock3, 0, releaseev, ev))
1553
          acquireev.append(ev)
1554

    
1555
          # Wait for all locks to be acquired
1556
          for i in acquireev:
1557
            i.wait()
1558

    
1559
          # Check query result
1560
          for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
1561
                                                         "owner"], False):
1562
            if name == tlock1.name:
1563
              self.assertEqual(mode, "shared")
1564
              self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
1565
              continue
1566

    
1567
            if name == tlock2.name:
1568
              self.assertEqual(mode, "shared")
1569
              self.assertEqual(owner, [tthread2.getName()])
1570
              continue
1571

    
1572
            if name == tlock3.name:
1573
              self.assertEqual(mode, "exclusive")
1574
              self.assertEqual(owner, [tthread3.getName()])
1575
              continue
1576

    
1577
            self.assert_(name in expnames)
1578
            self.assert_(mode is None)
1579
            self.assert_(owner is None)
1580

    
1581
          # Release locks again
1582
          releaseev.set()
1583

    
1584
          self._waitThreads()
1585

    
1586
          self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1587
                           [[name, None, None]
1588
                            for name in utils.NiceSort(expnames)])
1589

    
1590
  def testDelete(self):
1591
    lock = locking.SharedLock("TestLock", monitor=self.lm)
1592

    
1593
    self.assertEqual(len(self.lm._locks), 1)
1594
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1595
                     [[lock.name, None, None]])
1596

    
1597
    lock.delete()
1598

    
1599
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1600
                     [[lock.name, "deleted", None]])
1601
    self.assertEqual(len(self.lm._locks), 1)
1602

    
1603
  def testPending(self):
1604
    def _Acquire(lock, shared, prev, next):
1605
      prev.wait()
1606

    
1607
      lock.acquire(shared=shared, test_notify=next.set)
1608
      try:
1609
        pass
1610
      finally:
1611
        lock.release()
1612

    
1613
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
1614

    
1615
    for shared in [0, 1]:
1616
      lock.acquire()
1617
      try:
1618
        self.assertEqual(len(self.lm._locks), 1)
1619
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1620
                         [[lock.name, "exclusive",
1621
                           [threading.currentThread().getName()]]])
1622

    
1623
        threads = []
1624

    
1625
        first = threading.Event()
1626
        prev = first
1627

    
1628
        for i in range(5):
1629
          ev = threading.Event()
1630
          threads.append(self._addThread(target=_Acquire,
1631
                                          args=(lock, shared, prev, ev)))
1632
          prev = ev
1633

    
1634
        # Start acquires
1635
        first.set()
1636

    
1637
        # Wait for last acquire to start waiting
1638
        prev.wait()
1639

    
1640
        # NOTE: This works only because QueryLocks will acquire the
1641
        # lock-internal lock again and won't be able to get the information
1642
        # until it has the lock. By then the acquire should be registered in
1643
        # SharedLock.__pending (otherwise it's a bug).
1644

    
1645
        # All acquires are waiting now
1646
        if shared:
1647
          pending = [("shared", sorted([t.getName() for t in threads]))]
1648
        else:
1649
          pending = [("exclusive", [t.getName()]) for t in threads]
1650

    
1651
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1652
                                             "pending"], False),
1653
                         [[lock.name, "exclusive",
1654
                           [threading.currentThread().getName()],
1655
                           pending]])
1656

    
1657
        self.assertEqual(len(self.lm._locks), 1)
1658
      finally:
1659
        lock.release()
1660

    
1661
      self._waitThreads()
1662

    
1663
      # No pending acquires
1664
      self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1665
                                          False),
1666
                       [[lock.name, None, None, []]])
1667

    
1668
      self.assertEqual(len(self.lm._locks), 1)
1669

    
1670

    
1671
if __name__ == '__main__':
1672
  testutils.GanetiTestProgram()