Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 887c7aa6

History | View | Annotate | Download (58.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import itertools
32

    
33
from ganeti import locking
34
from ganeti import errors
35
from ganeti import utils
36
from ganeti import compat
37

    
38
import testutils
39

    
40

    
41
# This is used to test the ssynchronize decorator.
42
# Since it's passed as input to a decorator it must be declared as a global.
43
_decoratorlock = locking.SharedLock("decorator lock")
44

    
45
#: List for looping tests
46
ITERATIONS = range(8)
47

    
48

    
49
def _Repeat(fn):
50
  """Decorator for executing a function many times"""
51
  def wrapper(*args, **kwargs):
52
    for i in ITERATIONS:
53
      fn(*args, **kwargs)
54
  return wrapper
55

    
56

    
57
def SafeSleep(duration):
58
  start = time.time()
59
  while True:
60
    delay = start + duration - time.time()
61
    if delay <= 0.0:
62
      break
63
    time.sleep(delay)
64

    
65

    
66
class _ThreadedTestCase(unittest.TestCase):
67
  """Test class that supports adding/waiting on threads"""
68
  def setUp(self):
69
    unittest.TestCase.setUp(self)
70
    self.done = Queue.Queue(0)
71
    self.threads = []
72

    
73
  def _addThread(self, *args, **kwargs):
74
    """Create and remember a new thread"""
75
    t = threading.Thread(*args, **kwargs)
76
    self.threads.append(t)
77
    t.start()
78
    return t
79

    
80
  def _waitThreads(self):
81
    """Wait for all our threads to finish"""
82
    for t in self.threads:
83
      t.join(60)
84
      self.failIf(t.isAlive())
85
    self.threads = []
86

    
87

    
88
class _ConditionTestCase(_ThreadedTestCase):
89
  """Common test case for conditions"""
90

    
91
  def setUp(self, cls):
92
    _ThreadedTestCase.setUp(self)
93
    self.lock = threading.Lock()
94
    self.cond = cls(self.lock)
95

    
96
  def _testAcquireRelease(self):
97
    self.assertFalse(self.cond._is_owned())
98
    self.assertRaises(RuntimeError, self.cond.wait)
99
    self.assertRaises(RuntimeError, self.cond.notifyAll)
100

    
101
    self.cond.acquire()
102
    self.assert_(self.cond._is_owned())
103
    self.cond.notifyAll()
104
    self.assert_(self.cond._is_owned())
105
    self.cond.release()
106

    
107
    self.assertFalse(self.cond._is_owned())
108
    self.assertRaises(RuntimeError, self.cond.wait)
109
    self.assertRaises(RuntimeError, self.cond.notifyAll)
110

    
111
  def _testNotification(self):
112
    def _NotifyAll():
113
      self.done.put("NE")
114
      self.cond.acquire()
115
      self.done.put("NA")
116
      self.cond.notifyAll()
117
      self.done.put("NN")
118
      self.cond.release()
119

    
120
    self.cond.acquire()
121
    self._addThread(target=_NotifyAll)
122
    self.assertEqual(self.done.get(True, 1), "NE")
123
    self.assertRaises(Queue.Empty, self.done.get_nowait)
124
    self.cond.wait()
125
    self.assertEqual(self.done.get(True, 1), "NA")
126
    self.assertEqual(self.done.get(True, 1), "NN")
127
    self.assert_(self.cond._is_owned())
128
    self.cond.release()
129
    self.assertFalse(self.cond._is_owned())
130

    
131

    
132
class TestSingleNotifyPipeCondition(_ConditionTestCase):
133
  """SingleNotifyPipeCondition tests"""
134

    
135
  def setUp(self):
136
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
137

    
138
  def testAcquireRelease(self):
139
    self._testAcquireRelease()
140

    
141
  def testNotification(self):
142
    self._testNotification()
143

    
144
  def testWaitReuse(self):
145
    self.cond.acquire()
146
    self.cond.wait(0)
147
    self.cond.wait(0.1)
148
    self.cond.release()
149

    
150
  def testNoNotifyReuse(self):
151
    self.cond.acquire()
152
    self.cond.notifyAll()
153
    self.assertRaises(RuntimeError, self.cond.wait)
154
    self.assertRaises(RuntimeError, self.cond.notifyAll)
155
    self.cond.release()
156

    
157

    
158
class TestPipeCondition(_ConditionTestCase):
159
  """PipeCondition tests"""
160

    
161
  def setUp(self):
162
    _ConditionTestCase.setUp(self, locking.PipeCondition)
163

    
164
  def testAcquireRelease(self):
165
    self._testAcquireRelease()
166

    
167
  def testNotification(self):
168
    self._testNotification()
169

    
170
  def _TestWait(self, fn):
171
    threads = [
172
      self._addThread(target=fn),
173
      self._addThread(target=fn),
174
      self._addThread(target=fn),
175
      ]
176

    
177
    # Wait for threads to be waiting
178
    for _ in threads:
179
      self.assertEqual(self.done.get(True, 1), "A")
180

    
181
    self.assertRaises(Queue.Empty, self.done.get_nowait)
182

    
183
    self.cond.acquire()
184
    self.assertEqual(len(self.cond._waiters), 3)
185
    self.assertEqual(self.cond._waiters, set(threads))
186
    # This new thread can't acquire the lock, and thus call wait, before we
187
    # release it
188
    self._addThread(target=fn)
189
    self.cond.notifyAll()
190
    self.assertRaises(Queue.Empty, self.done.get_nowait)
191
    self.cond.release()
192

    
193
    # We should now get 3 W and 1 A (for the new thread) in whatever order
194
    w = 0
195
    a = 0
196
    for i in range(4):
197
      got = self.done.get(True, 1)
198
      if got == "W":
199
        w += 1
200
      elif got == "A":
201
        a += 1
202
      else:
203
        self.fail("Got %s on the done queue" % got)
204

    
205
    self.assertEqual(w, 3)
206
    self.assertEqual(a, 1)
207

    
208
    self.cond.acquire()
209
    self.cond.notifyAll()
210
    self.cond.release()
211
    self._waitThreads()
212
    self.assertEqual(self.done.get_nowait(), "W")
213
    self.assertRaises(Queue.Empty, self.done.get_nowait)
214

    
215
  def testBlockingWait(self):
216
    def _BlockingWait():
217
      self.cond.acquire()
218
      self.done.put("A")
219
      self.cond.wait()
220
      self.cond.release()
221
      self.done.put("W")
222

    
223
    self._TestWait(_BlockingWait)
224

    
225
  def testLongTimeoutWait(self):
226
    def _Helper():
227
      self.cond.acquire()
228
      self.done.put("A")
229
      self.cond.wait(15.0)
230
      self.cond.release()
231
      self.done.put("W")
232

    
233
    self._TestWait(_Helper)
234

    
235
  def _TimeoutWait(self, timeout, check):
236
    self.cond.acquire()
237
    self.cond.wait(timeout)
238
    self.cond.release()
239
    self.done.put(check)
240

    
241
  def testShortTimeoutWait(self):
242
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
243
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
244
    self._waitThreads()
245
    self.assertEqual(self.done.get_nowait(), "T1")
246
    self.assertEqual(self.done.get_nowait(), "T1")
247
    self.assertRaises(Queue.Empty, self.done.get_nowait)
248

    
249
  def testZeroTimeoutWait(self):
250
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
251
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
252
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
253
    self._waitThreads()
254
    self.assertEqual(self.done.get_nowait(), "T0")
255
    self.assertEqual(self.done.get_nowait(), "T0")
256
    self.assertEqual(self.done.get_nowait(), "T0")
257
    self.assertRaises(Queue.Empty, self.done.get_nowait)
258

    
259

    
260
class TestSharedLock(_ThreadedTestCase):
261
  """SharedLock tests"""
262

    
263
  def setUp(self):
264
    _ThreadedTestCase.setUp(self)
265
    self.sl = locking.SharedLock("TestSharedLock")
266

    
267
  def testSequenceAndOwnership(self):
268
    self.assertFalse(self.sl._is_owned())
269
    self.sl.acquire(shared=1)
270
    self.assert_(self.sl._is_owned())
271
    self.assert_(self.sl._is_owned(shared=1))
272
    self.assertFalse(self.sl._is_owned(shared=0))
273
    self.sl.release()
274
    self.assertFalse(self.sl._is_owned())
275
    self.sl.acquire()
276
    self.assert_(self.sl._is_owned())
277
    self.assertFalse(self.sl._is_owned(shared=1))
278
    self.assert_(self.sl._is_owned(shared=0))
279
    self.sl.release()
280
    self.assertFalse(self.sl._is_owned())
281
    self.sl.acquire(shared=1)
282
    self.assert_(self.sl._is_owned())
283
    self.assert_(self.sl._is_owned(shared=1))
284
    self.assertFalse(self.sl._is_owned(shared=0))
285
    self.sl.release()
286
    self.assertFalse(self.sl._is_owned())
287

    
288
  def testBooleanValue(self):
289
    # semaphores are supposed to return a true value on a successful acquire
290
    self.assert_(self.sl.acquire(shared=1))
291
    self.sl.release()
292
    self.assert_(self.sl.acquire())
293
    self.sl.release()
294

    
295
  def testDoubleLockingStoE(self):
296
    self.sl.acquire(shared=1)
297
    self.assertRaises(AssertionError, self.sl.acquire)
298

    
299
  def testDoubleLockingEtoS(self):
300
    self.sl.acquire()
301
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
302

    
303
  def testDoubleLockingStoS(self):
304
    self.sl.acquire(shared=1)
305
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306

    
307
  def testDoubleLockingEtoE(self):
308
    self.sl.acquire()
309
    self.assertRaises(AssertionError, self.sl.acquire)
310

    
311
  # helper functions: called in a separate thread they acquire the lock, send
312
  # their identifier on the done queue, then release it.
313
  def _doItSharer(self):
314
    try:
315
      self.sl.acquire(shared=1)
316
      self.done.put('SHR')
317
      self.sl.release()
318
    except errors.LockError:
319
      self.done.put('ERR')
320

    
321
  def _doItExclusive(self):
322
    try:
323
      self.sl.acquire()
324
      self.done.put('EXC')
325
      self.sl.release()
326
    except errors.LockError:
327
      self.done.put('ERR')
328

    
329
  def _doItDelete(self):
330
    try:
331
      self.sl.delete()
332
      self.done.put('DEL')
333
    except errors.LockError:
334
      self.done.put('ERR')
335

    
336
  def testSharersCanCoexist(self):
337
    self.sl.acquire(shared=1)
338
    threading.Thread(target=self._doItSharer).start()
339
    self.assert_(self.done.get(True, 1))
340
    self.sl.release()
341

    
342
  @_Repeat
343
  def testExclusiveBlocksExclusive(self):
344
    self.sl.acquire()
345
    self._addThread(target=self._doItExclusive)
346
    self.assertRaises(Queue.Empty, self.done.get_nowait)
347
    self.sl.release()
348
    self._waitThreads()
349
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
350

    
351
  @_Repeat
352
  def testExclusiveBlocksDelete(self):
353
    self.sl.acquire()
354
    self._addThread(target=self._doItDelete)
355
    self.assertRaises(Queue.Empty, self.done.get_nowait)
356
    self.sl.release()
357
    self._waitThreads()
358
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
359
    self.sl = locking.SharedLock(self.sl.name)
360

    
361
  @_Repeat
362
  def testExclusiveBlocksSharer(self):
363
    self.sl.acquire()
364
    self._addThread(target=self._doItSharer)
365
    self.assertRaises(Queue.Empty, self.done.get_nowait)
366
    self.sl.release()
367
    self._waitThreads()
368
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
369

    
370
  @_Repeat
371
  def testSharerBlocksExclusive(self):
372
    self.sl.acquire(shared=1)
373
    self._addThread(target=self._doItExclusive)
374
    self.assertRaises(Queue.Empty, self.done.get_nowait)
375
    self.sl.release()
376
    self._waitThreads()
377
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
378

    
379
  @_Repeat
380
  def testSharerBlocksDelete(self):
381
    self.sl.acquire(shared=1)
382
    self._addThread(target=self._doItDelete)
383
    self.assertRaises(Queue.Empty, self.done.get_nowait)
384
    self.sl.release()
385
    self._waitThreads()
386
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
387
    self.sl = locking.SharedLock(self.sl.name)
388

    
389
  @_Repeat
390
  def testWaitingExclusiveBlocksSharer(self):
391
    """SKIPPED testWaitingExclusiveBlockSharer"""
392
    return
393

    
394
    self.sl.acquire(shared=1)
395
    # the lock is acquired in shared mode...
396
    self._addThread(target=self._doItExclusive)
397
    # ...but now an exclusive is waiting...
398
    self._addThread(target=self._doItSharer)
399
    # ...so the sharer should be blocked as well
400
    self.assertRaises(Queue.Empty, self.done.get_nowait)
401
    self.sl.release()
402
    self._waitThreads()
403
    # The exclusive passed before
404
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
405
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
406

    
407
  @_Repeat
408
  def testWaitingSharerBlocksExclusive(self):
409
    """SKIPPED testWaitingSharerBlocksExclusive"""
410
    return
411

    
412
    self.sl.acquire()
413
    # the lock is acquired in exclusive mode...
414
    self._addThread(target=self._doItSharer)
415
    # ...but now a sharer is waiting...
416
    self._addThread(target=self._doItExclusive)
417
    # ...the exclusive is waiting too...
418
    self.assertRaises(Queue.Empty, self.done.get_nowait)
419
    self.sl.release()
420
    self._waitThreads()
421
    # The sharer passed before
422
    self.assertEqual(self.done.get_nowait(), 'SHR')
423
    self.assertEqual(self.done.get_nowait(), 'EXC')
424

    
425
  def testDelete(self):
426
    self.sl.delete()
427
    self.assertRaises(errors.LockError, self.sl.acquire)
428
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
429
    self.assertRaises(errors.LockError, self.sl.delete)
430

    
431
  def testDeleteTimeout(self):
432
    self.sl.delete(timeout=60)
433

    
434
  def testNoDeleteIfSharer(self):
435
    self.sl.acquire(shared=1)
436
    self.assertRaises(AssertionError, self.sl.delete)
437

    
438
  @_Repeat
439
  def testDeletePendingSharersExclusiveDelete(self):
440
    self.sl.acquire()
441
    self._addThread(target=self._doItSharer)
442
    self._addThread(target=self._doItSharer)
443
    self._addThread(target=self._doItExclusive)
444
    self._addThread(target=self._doItDelete)
445
    self.sl.delete()
446
    self._waitThreads()
447
    # The threads who were pending return ERR
448
    for _ in range(4):
449
      self.assertEqual(self.done.get_nowait(), 'ERR')
450
    self.sl = locking.SharedLock(self.sl.name)
451

    
452
  @_Repeat
453
  def testDeletePendingDeleteExclusiveSharers(self):
454
    self.sl.acquire()
455
    self._addThread(target=self._doItDelete)
456
    self._addThread(target=self._doItExclusive)
457
    self._addThread(target=self._doItSharer)
458
    self._addThread(target=self._doItSharer)
459
    self.sl.delete()
460
    self._waitThreads()
461
    # The two threads who were pending return both ERR
462
    self.assertEqual(self.done.get_nowait(), 'ERR')
463
    self.assertEqual(self.done.get_nowait(), 'ERR')
464
    self.assertEqual(self.done.get_nowait(), 'ERR')
465
    self.assertEqual(self.done.get_nowait(), 'ERR')
466
    self.sl = locking.SharedLock(self.sl.name)
467

    
468
  @_Repeat
469
  def testExclusiveAcquireTimeout(self):
470
    for shared in [0, 1]:
471
      on_queue = threading.Event()
472
      release_exclusive = threading.Event()
473

    
474
      def _LockExclusive():
475
        self.sl.acquire(shared=0, test_notify=on_queue.set)
476
        self.done.put("A: start wait")
477
        release_exclusive.wait()
478
        self.done.put("A: end wait")
479
        self.sl.release()
480

    
481
      # Start thread to hold lock in exclusive mode
482
      self._addThread(target=_LockExclusive)
483

    
484
      # Wait for wait to begin
485
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
486

    
487
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
488
      # on the queue
489
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
490
                                      test_notify=release_exclusive.set))
491

    
492
      self.done.put("got 2nd")
493
      self.sl.release()
494

    
495
      self._waitThreads()
496

    
497
      self.assertEqual(self.done.get_nowait(), "A: end wait")
498
      self.assertEqual(self.done.get_nowait(), "got 2nd")
499
      self.assertRaises(Queue.Empty, self.done.get_nowait)
500

    
501
  @_Repeat
502
  def testAcquireExpiringTimeout(self):
503
    def _AcquireWithTimeout(shared, timeout):
504
      if not self.sl.acquire(shared=shared, timeout=timeout):
505
        self.done.put("timeout")
506

    
507
    for shared in [0, 1]:
508
      # Lock exclusively
509
      self.sl.acquire()
510

    
511
      # Start shared acquires with timeout between 0 and 20 ms
512
      for i in range(11):
513
        self._addThread(target=_AcquireWithTimeout,
514
                        args=(shared, i * 2.0 / 1000.0))
515

    
516
      # Wait for threads to finish (makes sure the acquire timeout expires
517
      # before releasing the lock)
518
      self._waitThreads()
519

    
520
      # Release lock
521
      self.sl.release()
522

    
523
      for _ in range(11):
524
        self.assertEqual(self.done.get_nowait(), "timeout")
525

    
526
      self.assertRaises(Queue.Empty, self.done.get_nowait)
527

    
528
  @_Repeat
529
  def testSharedSkipExclusiveAcquires(self):
530
    # Tests whether shared acquires jump in front of exclusive acquires in the
531
    # queue.
532

    
533
    def _Acquire(shared, name, notify_ev, wait_ev):
534
      if notify_ev:
535
        notify_fn = notify_ev.set
536
      else:
537
        notify_fn = None
538

    
539
      if wait_ev:
540
        wait_ev.wait()
541

    
542
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
543
        return
544

    
545
      self.done.put(name)
546
      self.sl.release()
547

    
548
    # Get exclusive lock while we fill the queue
549
    self.sl.acquire()
550

    
551
    shrcnt1 = 5
552
    shrcnt2 = 7
553
    shrcnt3 = 9
554
    shrcnt4 = 2
555

    
556
    # Add acquires using threading.Event for synchronization. They'll be
557
    # acquired exactly in the order defined in this list.
558
    acquires = (shrcnt1 * [(1, "shared 1")] +
559
                3 * [(0, "exclusive 1")] +
560
                shrcnt2 * [(1, "shared 2")] +
561
                shrcnt3 * [(1, "shared 3")] +
562
                shrcnt4 * [(1, "shared 4")] +
563
                3 * [(0, "exclusive 2")])
564

    
565
    ev_cur = None
566
    ev_prev = None
567

    
568
    for args in acquires:
569
      ev_cur = threading.Event()
570
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
571
      ev_prev = ev_cur
572

    
573
    # Wait for last acquire to start
574
    ev_prev.wait()
575

    
576
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
577
    # together
578
    self.assertEqual(self.sl._count_pending(), 7)
579

    
580
    # Release exclusive lock and wait
581
    self.sl.release()
582

    
583
    self._waitThreads()
584

    
585
    # Check sequence
586
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
587
      # Shared locks aren't guaranteed to be notified in order, but they'll be
588
      # first
589
      tmp = self.done.get_nowait()
590
      if tmp == "shared 1":
591
        shrcnt1 -= 1
592
      elif tmp == "shared 2":
593
        shrcnt2 -= 1
594
      elif tmp == "shared 3":
595
        shrcnt3 -= 1
596
      elif tmp == "shared 4":
597
        shrcnt4 -= 1
598
    self.assertEqual(shrcnt1, 0)
599
    self.assertEqual(shrcnt2, 0)
600
    self.assertEqual(shrcnt3, 0)
601
    self.assertEqual(shrcnt3, 0)
602

    
603
    for _ in range(3):
604
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
605

    
606
    for _ in range(3):
607
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
608

    
609
    self.assertRaises(Queue.Empty, self.done.get_nowait)
610

    
611
  @_Repeat
612
  def testMixedAcquireTimeout(self):
613
    sync = threading.Event()
614

    
615
    def _AcquireShared(ev):
616
      if not self.sl.acquire(shared=1, timeout=None):
617
        return
618

    
619
      self.done.put("shared")
620

    
621
      # Notify main thread
622
      ev.set()
623

    
624
      # Wait for notification from main thread
625
      sync.wait()
626

    
627
      # Release lock
628
      self.sl.release()
629

    
630
    acquires = []
631
    for _ in range(3):
632
      ev = threading.Event()
633
      self._addThread(target=_AcquireShared, args=(ev, ))
634
      acquires.append(ev)
635

    
636
    # Wait for all acquires to finish
637
    for i in acquires:
638
      i.wait()
639

    
640
    self.assertEqual(self.sl._count_pending(), 0)
641

    
642
    # Try to get exclusive lock
643
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
644

    
645
    # Acquire exclusive without timeout
646
    exclsync = threading.Event()
647
    exclev = threading.Event()
648

    
649
    def _AcquireExclusive():
650
      if not self.sl.acquire(shared=0):
651
        return
652

    
653
      self.done.put("exclusive")
654

    
655
      # Notify main thread
656
      exclev.set()
657

    
658
      # Wait for notification from main thread
659
      exclsync.wait()
660

    
661
      self.sl.release()
662

    
663
    self._addThread(target=_AcquireExclusive)
664

    
665
    # Try to get exclusive lock
666
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
667

    
668
    # Make all shared holders release their locks
669
    sync.set()
670

    
671
    # Wait for exclusive acquire to succeed
672
    exclev.wait()
673

    
674
    self.assertEqual(self.sl._count_pending(), 0)
675

    
676
    # Try to get exclusive lock
677
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
678

    
679
    def _AcquireSharedSimple():
680
      if self.sl.acquire(shared=1, timeout=None):
681
        self.done.put("shared2")
682
        self.sl.release()
683

    
684
    for _ in range(10):
685
      self._addThread(target=_AcquireSharedSimple)
686

    
687
    # Tell exclusive lock to release
688
    exclsync.set()
689

    
690
    # Wait for everything to finish
691
    self._waitThreads()
692

    
693
    self.assertEqual(self.sl._count_pending(), 0)
694

    
695
    # Check sequence
696
    for _ in range(3):
697
      self.assertEqual(self.done.get_nowait(), "shared")
698

    
699
    self.assertEqual(self.done.get_nowait(), "exclusive")
700

    
701
    for _ in range(10):
702
      self.assertEqual(self.done.get_nowait(), "shared2")
703

    
704
    self.assertRaises(Queue.Empty, self.done.get_nowait)
705

    
706
  def testPriority(self):
707
    # Acquire in exclusive mode
708
    self.assert_(self.sl.acquire(shared=0))
709

    
710
    # Queue acquires
711
    def _Acquire(prev, next, shared, priority, result):
712
      prev.wait()
713
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
714
      try:
715
        self.done.put(result)
716
      finally:
717
        self.sl.release()
718

    
719
    counter = itertools.count(0)
720
    priorities = range(-20, 30)
721
    first = threading.Event()
722
    prev = first
723

    
724
    # Data structure:
725
    # {
726
    #   priority:
727
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
728
    #      (shared/exclusive, ...),
729
    #      ...,
730
    #     ],
731
    # }
732
    perprio = {}
733

    
734
    # References shared acquire per priority in L{perprio}. Data structure:
735
    # {
736
    #   priority: (shared=1, set(acquire names), set(pending threads)),
737
    # }
738
    prioshared = {}
739

    
740
    for seed in [4979, 9523, 14902, 32440]:
741
      # Use a deterministic random generator
742
      rnd = random.Random(seed)
743
      for priority in [rnd.choice(priorities) for _ in range(30)]:
744
        modes = [0, 1]
745
        rnd.shuffle(modes)
746
        for shared in modes:
747
          # Unique name
748
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
749

    
750
          ev = threading.Event()
751
          thread = self._addThread(target=_Acquire,
752
                                   args=(prev, ev, shared, priority, acqname))
753
          prev = ev
754

    
755
          # Record expected aqcuire, see above for structure
756
          data = (shared, set([acqname]), set([thread]))
757
          priolist = perprio.setdefault(priority, [])
758
          if shared:
759
            priosh = prioshared.get(priority, None)
760
            if priosh:
761
              # Shared acquires are merged
762
              for i, j in zip(priosh[1:], data[1:]):
763
                i.update(j)
764
              assert data[0] == priosh[0]
765
            else:
766
              prioshared[priority] = data
767
              priolist.append(data)
768
          else:
769
            priolist.append(data)
770

    
771
    # Start all acquires and wait for them
772
    first.set()
773
    prev.wait()
774

    
775
    # Check lock information
776
    self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
777
    self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
778
                     ["exclusive", [threading.currentThread().getName()]])
779
    self.assertEqual(self.sl.GetInfo(["name", "pending"]),
780
                     [self.sl.name,
781
                      [(["exclusive", "shared"][int(bool(shared))],
782
                        sorted([t.getName() for t in threads]))
783
                       for acquires in [perprio[i]
784
                                        for i in sorted(perprio.keys())]
785
                       for (shared, _, threads) in acquires]])
786

    
787
    # Let threads acquire the lock
788
    self.sl.release()
789

    
790
    # Wait for everything to finish
791
    self._waitThreads()
792

    
793
    self.assert_(self.sl._check_empty())
794

    
795
    # Check acquires by priority
796
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
797
      for (_, names, _) in acquires:
798
        # For shared acquires, the set will contain 1..n entries. For exclusive
799
        # acquires only one.
800
        while names:
801
          names.remove(self.done.get_nowait())
802
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
803

    
804
    self.assertRaises(Queue.Empty, self.done.get_nowait)
805

    
806

    
807
class TestSharedLockInCondition(_ThreadedTestCase):
808
  """SharedLock as a condition lock tests"""
809

    
810
  def setUp(self):
811
    _ThreadedTestCase.setUp(self)
812
    self.sl = locking.SharedLock("TestSharedLockInCondition")
813
    self.setCondition()
814

    
815
  def setCondition(self):
816
    self.cond = threading.Condition(self.sl)
817

    
818
  def testKeepMode(self):
819
    self.cond.acquire(shared=1)
820
    self.assert_(self.sl._is_owned(shared=1))
821
    self.cond.wait(0)
822
    self.assert_(self.sl._is_owned(shared=1))
823
    self.cond.release()
824
    self.cond.acquire(shared=0)
825
    self.assert_(self.sl._is_owned(shared=0))
826
    self.cond.wait(0)
827
    self.assert_(self.sl._is_owned(shared=0))
828
    self.cond.release()
829

    
830

    
831
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
832
  """SharedLock as a pipe condition lock tests"""
833

    
834
  def setCondition(self):
835
    self.cond = locking.PipeCondition(self.sl)
836

    
837

    
838
class TestSSynchronizedDecorator(_ThreadedTestCase):
839
  """Shared Lock Synchronized decorator test"""
840

    
841
  def setUp(self):
842
    _ThreadedTestCase.setUp(self)
843

    
844
  @locking.ssynchronized(_decoratorlock)
845
  def _doItExclusive(self):
846
    self.assert_(_decoratorlock._is_owned())
847
    self.done.put('EXC')
848

    
849
  @locking.ssynchronized(_decoratorlock, shared=1)
850
  def _doItSharer(self):
851
    self.assert_(_decoratorlock._is_owned(shared=1))
852
    self.done.put('SHR')
853

    
854
  def testDecoratedFunctions(self):
855
    self._doItExclusive()
856
    self.assertFalse(_decoratorlock._is_owned())
857
    self._doItSharer()
858
    self.assertFalse(_decoratorlock._is_owned())
859

    
860
  def testSharersCanCoexist(self):
861
    _decoratorlock.acquire(shared=1)
862
    threading.Thread(target=self._doItSharer).start()
863
    self.assert_(self.done.get(True, 1))
864
    _decoratorlock.release()
865

    
866
  @_Repeat
867
  def testExclusiveBlocksExclusive(self):
868
    _decoratorlock.acquire()
869
    self._addThread(target=self._doItExclusive)
870
    # give it a bit of time to check that it's not actually doing anything
871
    self.assertRaises(Queue.Empty, self.done.get_nowait)
872
    _decoratorlock.release()
873
    self._waitThreads()
874
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
875

    
876
  @_Repeat
877
  def testExclusiveBlocksSharer(self):
878
    _decoratorlock.acquire()
879
    self._addThread(target=self._doItSharer)
880
    self.assertRaises(Queue.Empty, self.done.get_nowait)
881
    _decoratorlock.release()
882
    self._waitThreads()
883
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
884

    
885
  @_Repeat
886
  def testSharerBlocksExclusive(self):
887
    _decoratorlock.acquire(shared=1)
888
    self._addThread(target=self._doItExclusive)
889
    self.assertRaises(Queue.Empty, self.done.get_nowait)
890
    _decoratorlock.release()
891
    self._waitThreads()
892
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
893

    
894

    
895
class TestLockSet(_ThreadedTestCase):
896
  """LockSet tests"""
897

    
898
  def setUp(self):
899
    _ThreadedTestCase.setUp(self)
900
    self._setUpLS()
901

    
902
  def _setUpLS(self):
903
    """Helper to (re)initialize the lock set"""
904
    self.resources = ['one', 'two', 'three']
905
    self.ls = locking.LockSet(self.resources, "TestLockSet")
906

    
907
  def testResources(self):
908
    self.assertEquals(self.ls._names(), set(self.resources))
909
    newls = locking.LockSet([], "TestLockSet.testResources")
910
    self.assertEquals(newls._names(), set())
911

    
912
  def testAcquireRelease(self):
913
    self.assert_(self.ls.acquire('one'))
914
    self.assertEquals(self.ls._list_owned(), set(['one']))
915
    self.ls.release()
916
    self.assertEquals(self.ls._list_owned(), set())
917
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
918
    self.assertEquals(self.ls._list_owned(), set(['one']))
919
    self.ls.release()
920
    self.assertEquals(self.ls._list_owned(), set())
921
    self.ls.acquire(['one', 'two', 'three'])
922
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
923
    self.ls.release('one')
924
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
925
    self.ls.release(['three'])
926
    self.assertEquals(self.ls._list_owned(), set(['two']))
927
    self.ls.release()
928
    self.assertEquals(self.ls._list_owned(), set())
929
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
930
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
931
    self.ls.release()
932
    self.assertEquals(self.ls._list_owned(), set())
933

    
934
  def testNoDoubleAcquire(self):
935
    self.ls.acquire('one')
936
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
937
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
938
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
939
    self.ls.release()
940
    self.ls.acquire(['one', 'three'])
941
    self.ls.release('one')
942
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
943
    self.ls.release('three')
944

    
945
  def testNoWrongRelease(self):
946
    self.assertRaises(AssertionError, self.ls.release)
947
    self.ls.acquire('one')
948
    self.assertRaises(AssertionError, self.ls.release, 'two')
949

    
950
  def testAddRemove(self):
951
    self.ls.add('four')
952
    self.assertEquals(self.ls._list_owned(), set())
953
    self.assert_('four' in self.ls._names())
954
    self.ls.add(['five', 'six', 'seven'], acquired=1)
955
    self.assert_('five' in self.ls._names())
956
    self.assert_('six' in self.ls._names())
957
    self.assert_('seven' in self.ls._names())
958
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
959
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
960
    self.assert_('five' not in self.ls._names())
961
    self.assert_('six' not in self.ls._names())
962
    self.assertEquals(self.ls._list_owned(), set(['seven']))
963
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
964
    self.ls.remove('seven')
965
    self.assert_('seven' not in self.ls._names())
966
    self.assertEquals(self.ls._list_owned(), set([]))
967
    self.ls.acquire(None, shared=1)
968
    self.assertRaises(AssertionError, self.ls.add, 'eight')
969
    self.ls.release()
970
    self.ls.acquire(None)
971
    self.ls.add('eight', acquired=1)
972
    self.assert_('eight' in self.ls._names())
973
    self.assert_('eight' in self.ls._list_owned())
974
    self.ls.add('nine')
975
    self.assert_('nine' in self.ls._names())
976
    self.assert_('nine' not in self.ls._list_owned())
977
    self.ls.release()
978
    self.ls.remove(['two'])
979
    self.assert_('two' not in self.ls._names())
980
    self.ls.acquire('three')
981
    self.assertEquals(self.ls.remove(['three']), ['three'])
982
    self.assert_('three' not in self.ls._names())
983
    self.assertEquals(self.ls.remove('three'), [])
984
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
985
    self.assert_('one' not in self.ls._names())
986

    
987
  def testRemoveNonBlocking(self):
988
    self.ls.acquire('one')
989
    self.assertEquals(self.ls.remove('one'), ['one'])
990
    self.ls.acquire(['two', 'three'])
991
    self.assertEquals(self.ls.remove(['two', 'three']),
992
                      ['two', 'three'])
993

    
994
  def testNoDoubleAdd(self):
995
    self.assertRaises(errors.LockError, self.ls.add, 'two')
996
    self.ls.add('four')
997
    self.assertRaises(errors.LockError, self.ls.add, 'four')
998

    
999
  def testNoWrongRemoves(self):
1000
    self.ls.acquire(['one', 'three'], shared=1)
1001
    # Cannot remove 'two' while holding something which is not a superset
1002
    self.assertRaises(AssertionError, self.ls.remove, 'two')
1003
    # Cannot remove 'three' as we are sharing it
1004
    self.assertRaises(AssertionError, self.ls.remove, 'three')
1005

    
1006
  def testAcquireSetLock(self):
1007
    # acquire the set-lock exclusively
1008
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1009
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1010
    self.assertEquals(self.ls._is_owned(), True)
1011
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1012
    # I can still add/remove elements...
1013
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1014
    self.assert_(self.ls.add('six'))
1015
    self.ls.release()
1016
    # share the set-lock
1017
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1018
    # adding new elements is not possible
1019
    self.assertRaises(AssertionError, self.ls.add, 'five')
1020
    self.ls.release()
1021

    
1022
  def testAcquireWithRepetitions(self):
1023
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1024
                      set(['two', 'two', 'three']))
1025
    self.ls.release(['two', 'two'])
1026
    self.assertEquals(self.ls._list_owned(), set(['three']))
1027

    
1028
  def testEmptyAcquire(self):
1029
    # Acquire an empty list of locks...
1030
    self.assertEquals(self.ls.acquire([]), set())
1031
    self.assertEquals(self.ls._list_owned(), set())
1032
    # New locks can still be addded
1033
    self.assert_(self.ls.add('six'))
1034
    # "re-acquiring" is not an issue, since we had really acquired nothing
1035
    self.assertEquals(self.ls.acquire([], shared=1), set())
1036
    self.assertEquals(self.ls._list_owned(), set())
1037
    # We haven't really acquired anything, so we cannot release
1038
    self.assertRaises(AssertionError, self.ls.release)
1039

    
1040
  def _doLockSet(self, names, shared):
1041
    try:
1042
      self.ls.acquire(names, shared=shared)
1043
      self.done.put('DONE')
1044
      self.ls.release()
1045
    except errors.LockError:
1046
      self.done.put('ERR')
1047

    
1048
  def _doAddSet(self, names):
1049
    try:
1050
      self.ls.add(names, acquired=1)
1051
      self.done.put('DONE')
1052
      self.ls.release()
1053
    except errors.LockError:
1054
      self.done.put('ERR')
1055

    
1056
  def _doRemoveSet(self, names):
1057
    self.done.put(self.ls.remove(names))
1058

    
1059
  @_Repeat
1060
  def testConcurrentSharedAcquire(self):
1061
    self.ls.acquire(['one', 'two'], shared=1)
1062
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1063
    self._waitThreads()
1064
    self.assertEqual(self.done.get_nowait(), 'DONE')
1065
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1066
    self._waitThreads()
1067
    self.assertEqual(self.done.get_nowait(), 'DONE')
1068
    self._addThread(target=self._doLockSet, args=('three', 1))
1069
    self._waitThreads()
1070
    self.assertEqual(self.done.get_nowait(), 'DONE')
1071
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1072
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1073
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1074
    self.ls.release()
1075
    self._waitThreads()
1076
    self.assertEqual(self.done.get_nowait(), 'DONE')
1077
    self.assertEqual(self.done.get_nowait(), 'DONE')
1078

    
1079
  @_Repeat
1080
  def testConcurrentExclusiveAcquire(self):
1081
    self.ls.acquire(['one', 'two'])
1082
    self._addThread(target=self._doLockSet, args=('three', 1))
1083
    self._waitThreads()
1084
    self.assertEqual(self.done.get_nowait(), 'DONE')
1085
    self._addThread(target=self._doLockSet, args=('three', 0))
1086
    self._waitThreads()
1087
    self.assertEqual(self.done.get_nowait(), 'DONE')
1088
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1089
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1090
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1091
    self._addThread(target=self._doLockSet, args=('one', 0))
1092
    self._addThread(target=self._doLockSet, args=('one', 1))
1093
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1094
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1095
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1096
    self.ls.release()
1097
    self._waitThreads()
1098
    for _ in range(6):
1099
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1100

    
1101
  @_Repeat
1102
  def testSimpleAcquireTimeoutExpiring(self):
1103
    names = sorted(self.ls._names())
1104
    self.assert_(len(names) >= 3)
1105

    
1106
    # Get name of first lock
1107
    first = names[0]
1108

    
1109
    # Get name of last lock
1110
    last = names.pop()
1111

    
1112
    checks = [
1113
      # Block first and try to lock it again
1114
      (first, first),
1115

    
1116
      # Block last and try to lock all locks
1117
      (None, first),
1118

    
1119
      # Block last and try to lock it again
1120
      (last, last),
1121
      ]
1122

    
1123
    for (wanted, block) in checks:
1124
      # Lock in exclusive mode
1125
      self.assert_(self.ls.acquire(block, shared=0))
1126

    
1127
      def _AcquireOne():
1128
        # Try to get the same lock again with a timeout (should never succeed)
1129
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1130
        if acquired:
1131
          self.done.put("acquired")
1132
          self.ls.release()
1133
        else:
1134
          self.assert_(acquired is None)
1135
          self.assertFalse(self.ls._list_owned())
1136
          self.assertFalse(self.ls._is_owned())
1137
          self.done.put("not acquired")
1138

    
1139
      self._addThread(target=_AcquireOne)
1140

    
1141
      # Wait for timeout in thread to expire
1142
      self._waitThreads()
1143

    
1144
      # Release exclusive lock again
1145
      self.ls.release()
1146

    
1147
      self.assertEqual(self.done.get_nowait(), "not acquired")
1148
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1149

    
1150
  @_Repeat
1151
  def testDelayedAndExpiringLockAcquire(self):
1152
    self._setUpLS()
1153
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1154

    
1155
    for expire in (False, True):
1156
      names = sorted(self.ls._names())
1157
      self.assertEqual(len(names), 8)
1158

    
1159
      lock_ev = dict([(i, threading.Event()) for i in names])
1160

    
1161
      # Lock all in exclusive mode
1162
      self.assert_(self.ls.acquire(names, shared=0))
1163

    
1164
      if expire:
1165
        # We'll wait at least 300ms per lock
1166
        lockwait = len(names) * [0.3]
1167

    
1168
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1169
        # this gives us up to 2.4s to fail.
1170
        lockall_timeout = 0.4
1171
      else:
1172
        # This should finish rather quickly
1173
        lockwait = None
1174
        lockall_timeout = len(names) * 5.0
1175

    
1176
      def _LockAll():
1177
        def acquire_notification(name):
1178
          if not expire:
1179
            self.done.put("getting %s" % name)
1180

    
1181
          # Kick next lock
1182
          lock_ev[name].set()
1183

    
1184
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1185
                           test_notify=acquire_notification):
1186
          self.done.put("got all")
1187
          self.ls.release()
1188
        else:
1189
          self.done.put("timeout on all")
1190

    
1191
        # Notify all locks
1192
        for ev in lock_ev.values():
1193
          ev.set()
1194

    
1195
      t = self._addThread(target=_LockAll)
1196

    
1197
      for idx, name in enumerate(names):
1198
        # Wait for actual acquire on this lock to start
1199
        lock_ev[name].wait(10.0)
1200

    
1201
        if expire and t.isAlive():
1202
          # Wait some time after getting the notification to make sure the lock
1203
          # acquire will expire
1204
          SafeSleep(lockwait[idx])
1205

    
1206
        self.ls.release(names=name)
1207

    
1208
      self.assertFalse(self.ls._list_owned())
1209

    
1210
      self._waitThreads()
1211

    
1212
      if expire:
1213
        # Not checking which locks were actually acquired. Doing so would be
1214
        # too timing-dependant.
1215
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1216
      else:
1217
        for i in names:
1218
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1219
        self.assertEqual(self.done.get_nowait(), "got all")
1220
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1221

    
1222
  @_Repeat
1223
  def testConcurrentRemove(self):
1224
    self.ls.add('four')
1225
    self.ls.acquire(['one', 'two', 'four'])
1226
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1227
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1228
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1229
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1230
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1231
    self.ls.remove('one')
1232
    self.ls.release()
1233
    self._waitThreads()
1234
    for i in range(4):
1235
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1236
    self.ls.add(['five', 'six'], acquired=1)
1237
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1238
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1239
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1240
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1241
    self.ls.remove('five')
1242
    self.ls.release()
1243
    self._waitThreads()
1244
    for i in range(4):
1245
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1246
    self.ls.acquire(['three', 'four'])
1247
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1248
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1249
    self.ls.remove('four')
1250
    self._waitThreads()
1251
    self.assertEqual(self.done.get_nowait(), ['six'])
1252
    self._addThread(target=self._doRemoveSet, args=(['two']))
1253
    self._waitThreads()
1254
    self.assertEqual(self.done.get_nowait(), ['two'])
1255
    self.ls.release()
1256
    # reset lockset
1257
    self._setUpLS()
1258

    
1259
  @_Repeat
1260
  def testConcurrentSharedSetLock(self):
1261
    # share the set-lock...
1262
    self.ls.acquire(None, shared=1)
1263
    # ...another thread can share it too
1264
    self._addThread(target=self._doLockSet, args=(None, 1))
1265
    self._waitThreads()
1266
    self.assertEqual(self.done.get_nowait(), 'DONE')
1267
    # ...or just share some elements
1268
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1269
    self._waitThreads()
1270
    self.assertEqual(self.done.get_nowait(), 'DONE')
1271
    # ...but not add new ones or remove any
1272
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1273
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1274
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1275
    # this just releases the set-lock
1276
    self.ls.release([])
1277
    t.join(60)
1278
    self.assertEqual(self.done.get_nowait(), 'DONE')
1279
    # release the lock on the actual elements so remove() can proceed too
1280
    self.ls.release()
1281
    self._waitThreads()
1282
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1283
    # reset lockset
1284
    self._setUpLS()
1285

    
1286
  @_Repeat
1287
  def testConcurrentExclusiveSetLock(self):
1288
    # acquire the set-lock...
1289
    self.ls.acquire(None, shared=0)
1290
    # ...no one can do anything else
1291
    self._addThread(target=self._doLockSet, args=(None, 1))
1292
    self._addThread(target=self._doLockSet, args=(None, 0))
1293
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1294
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1295
    self._addThread(target=self._doAddSet, args=(['nine']))
1296
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1297
    self.ls.release()
1298
    self._waitThreads()
1299
    for _ in range(5):
1300
      self.assertEqual(self.done.get(True, 1), 'DONE')
1301
    # cleanup
1302
    self._setUpLS()
1303

    
1304
  @_Repeat
1305
  def testConcurrentSetLockAdd(self):
1306
    self.ls.acquire('one')
1307
    # Another thread wants the whole SetLock
1308
    self._addThread(target=self._doLockSet, args=(None, 0))
1309
    self._addThread(target=self._doLockSet, args=(None, 1))
1310
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1311
    self.assertRaises(AssertionError, self.ls.add, 'four')
1312
    self.ls.release()
1313
    self._waitThreads()
1314
    self.assertEqual(self.done.get_nowait(), 'DONE')
1315
    self.assertEqual(self.done.get_nowait(), 'DONE')
1316
    self.ls.acquire(None)
1317
    self._addThread(target=self._doLockSet, args=(None, 0))
1318
    self._addThread(target=self._doLockSet, args=(None, 1))
1319
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1320
    self.ls.add('four')
1321
    self.ls.add('five', acquired=1)
1322
    self.ls.add('six', acquired=1, shared=1)
1323
    self.assertEquals(self.ls._list_owned(),
1324
      set(['one', 'two', 'three', 'five', 'six']))
1325
    self.assertEquals(self.ls._is_owned(), True)
1326
    self.assertEquals(self.ls._names(),
1327
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1328
    self.ls.release()
1329
    self._waitThreads()
1330
    self.assertEqual(self.done.get_nowait(), 'DONE')
1331
    self.assertEqual(self.done.get_nowait(), 'DONE')
1332
    self._setUpLS()
1333

    
1334
  @_Repeat
1335
  def testEmptyLockSet(self):
1336
    # get the set-lock
1337
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1338
    # now empty it...
1339
    self.ls.remove(['one', 'two', 'three'])
1340
    # and adds/locks by another thread still wait
1341
    self._addThread(target=self._doAddSet, args=(['nine']))
1342
    self._addThread(target=self._doLockSet, args=(None, 1))
1343
    self._addThread(target=self._doLockSet, args=(None, 0))
1344
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1345
    self.ls.release()
1346
    self._waitThreads()
1347
    for _ in range(3):
1348
      self.assertEqual(self.done.get_nowait(), 'DONE')
1349
    # empty it again...
1350
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1351
    # now share it...
1352
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1353
    # other sharers can go, adds still wait
1354
    self._addThread(target=self._doLockSet, args=(None, 1))
1355
    self._waitThreads()
1356
    self.assertEqual(self.done.get_nowait(), 'DONE')
1357
    self._addThread(target=self._doAddSet, args=(['nine']))
1358
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1359
    self.ls.release()
1360
    self._waitThreads()
1361
    self.assertEqual(self.done.get_nowait(), 'DONE')
1362
    self._setUpLS()
1363

    
1364
  def testPriority(self):
1365
    def _Acquire(prev, next, name, priority, success_fn):
1366
      prev.wait()
1367
      self.assert_(self.ls.acquire(name, shared=0,
1368
                                   priority=priority,
1369
                                   test_notify=lambda _: next.set()))
1370
      try:
1371
        success_fn()
1372
      finally:
1373
        self.ls.release()
1374

    
1375
    # Get all in exclusive mode
1376
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1377

    
1378
    done_two = Queue.Queue(0)
1379

    
1380
    first = threading.Event()
1381
    prev = first
1382

    
1383
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1384
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1385

    
1386
    # Use a deterministic random generator
1387
    random.Random(741).shuffle(acquires)
1388

    
1389
    for (name, prio, done) in acquires:
1390
      ev = threading.Event()
1391
      self._addThread(target=_Acquire,
1392
                      args=(prev, ev, name, prio,
1393
                            compat.partial(done.put, "Prio%s" % prio)))
1394
      prev = ev
1395

    
1396
    # Start acquires
1397
    first.set()
1398

    
1399
    # Wait for last acquire to start
1400
    prev.wait()
1401

    
1402
    # Let threads acquire locks
1403
    self.ls.release()
1404

    
1405
    # Wait for threads to finish
1406
    self._waitThreads()
1407

    
1408
    for i in range(1, 33):
1409
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1410
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1411

    
1412
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1413
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1414

    
1415

    
1416
class TestGanetiLockManager(_ThreadedTestCase):
1417

    
1418
  def setUp(self):
1419
    _ThreadedTestCase.setUp(self)
1420
    self.nodes=['n1', 'n2']
1421
    self.instances=['i1', 'i2', 'i3']
1422
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1423
                                        instances=self.instances)
1424

    
1425
  def tearDown(self):
1426
    # Don't try this at home...
1427
    locking.GanetiLockManager._instance = None
1428

    
1429
  def testLockingConstants(self):
1430
    # The locking library internally cheats by assuming its constants have some
1431
    # relationships with each other. Check those hold true.
1432
    # This relationship is also used in the Processor to recursively acquire
1433
    # the right locks. Again, please don't break it.
1434
    for i in range(len(locking.LEVELS)):
1435
      self.assertEqual(i, locking.LEVELS[i])
1436

    
1437
  def testDoubleGLFails(self):
1438
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1439

    
1440
  def testLockNames(self):
1441
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1442
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1443
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1444
                     set(self.instances))
1445

    
1446
  def testInitAndResources(self):
1447
    locking.GanetiLockManager._instance = None
1448
    self.GL = locking.GanetiLockManager([], [])
1449
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1450
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1451
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1452

    
1453
    locking.GanetiLockManager._instance = None
1454
    self.GL = locking.GanetiLockManager(self.nodes, [])
1455
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1456
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1457
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1458

    
1459
    locking.GanetiLockManager._instance = None
1460
    self.GL = locking.GanetiLockManager([], self.instances)
1461
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1462
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1463
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1464
                     set(self.instances))
1465

    
1466
  def testAcquireRelease(self):
1467
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1468
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1469
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1470
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1471
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1472
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1473
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1474
    self.GL.release(locking.LEVEL_NODE)
1475
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1476
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1477
    self.GL.release(locking.LEVEL_INSTANCE)
1478
    self.assertRaises(errors.LockError, self.GL.acquire,
1479
                      locking.LEVEL_INSTANCE, ['i5'])
1480
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1481
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1482

    
1483
  def testAcquireWholeSets(self):
1484
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1485
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1486
                      set(self.instances))
1487
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1488
                      set(self.instances))
1489
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1490
                      set(self.nodes))
1491
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1492
                      set(self.nodes))
1493
    self.GL.release(locking.LEVEL_NODE)
1494
    self.GL.release(locking.LEVEL_INSTANCE)
1495
    self.GL.release(locking.LEVEL_CLUSTER)
1496

    
1497
  def testAcquireWholeAndPartial(self):
1498
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1499
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1500
                      set(self.instances))
1501
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1502
                      set(self.instances))
1503
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1504
                      set(['n2']))
1505
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1506
                      set(['n2']))
1507
    self.GL.release(locking.LEVEL_NODE)
1508
    self.GL.release(locking.LEVEL_INSTANCE)
1509
    self.GL.release(locking.LEVEL_CLUSTER)
1510

    
1511
  def testBGLDependency(self):
1512
    self.assertRaises(AssertionError, self.GL.acquire,
1513
                      locking.LEVEL_NODE, ['n1', 'n2'])
1514
    self.assertRaises(AssertionError, self.GL.acquire,
1515
                      locking.LEVEL_INSTANCE, ['i3'])
1516
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1517
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1518
    self.assertRaises(AssertionError, self.GL.release,
1519
                      locking.LEVEL_CLUSTER, ['BGL'])
1520
    self.assertRaises(AssertionError, self.GL.release,
1521
                      locking.LEVEL_CLUSTER)
1522
    self.GL.release(locking.LEVEL_NODE)
1523
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1524
    self.assertRaises(AssertionError, self.GL.release,
1525
                      locking.LEVEL_CLUSTER, ['BGL'])
1526
    self.assertRaises(AssertionError, self.GL.release,
1527
                      locking.LEVEL_CLUSTER)
1528
    self.GL.release(locking.LEVEL_INSTANCE)
1529

    
1530
  def testWrongOrder(self):
1531
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1532
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1533
    self.assertRaises(AssertionError, self.GL.acquire,
1534
                      locking.LEVEL_NODE, ['n1'])
1535
    self.assertRaises(AssertionError, self.GL.acquire,
1536
                      locking.LEVEL_INSTANCE, ['i2'])
1537

    
1538
  # Helper function to run as a thread that shared the BGL and then acquires
1539
  # some locks at another level.
1540
  def _doLock(self, level, names, shared):
1541
    try:
1542
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1543
      self.GL.acquire(level, names, shared=shared)
1544
      self.done.put('DONE')
1545
      self.GL.release(level)
1546
      self.GL.release(locking.LEVEL_CLUSTER)
1547
    except errors.LockError:
1548
      self.done.put('ERR')
1549

    
1550
  @_Repeat
1551
  def testConcurrency(self):
1552
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1553
    self._addThread(target=self._doLock,
1554
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1555
    self._waitThreads()
1556
    self.assertEqual(self.done.get_nowait(), 'DONE')
1557
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1558
    self._addThread(target=self._doLock,
1559
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1560
    self._waitThreads()
1561
    self.assertEqual(self.done.get_nowait(), 'DONE')
1562
    self._addThread(target=self._doLock,
1563
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1564
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1565
    self.GL.release(locking.LEVEL_INSTANCE)
1566
    self._waitThreads()
1567
    self.assertEqual(self.done.get_nowait(), 'DONE')
1568
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1569
    self._addThread(target=self._doLock,
1570
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1571
    self._waitThreads()
1572
    self.assertEqual(self.done.get_nowait(), 'DONE')
1573
    self._addThread(target=self._doLock,
1574
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1575
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1576
    self.GL.release(locking.LEVEL_INSTANCE)
1577
    self._waitThreads()
1578
    self.assertEqual(self.done.get(True, 1), 'DONE')
1579
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1580

    
1581

    
1582
class TestLockMonitor(_ThreadedTestCase):
1583
  def setUp(self):
1584
    _ThreadedTestCase.setUp(self)
1585
    self.lm = locking.LockMonitor()
1586

    
1587
  def testSingleThread(self):
1588
    locks = []
1589

    
1590
    for i in range(100):
1591
      name = "TestLock%s" % i
1592
      locks.append(locking.SharedLock(name, monitor=self.lm))
1593

    
1594
    self.assertEqual(len(self.lm._locks), len(locks))
1595

    
1596
    self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
1597
                     100)
1598

    
1599
    # Delete all locks
1600
    del locks[:]
1601

    
1602
    # The garbage collector might needs some time
1603
    def _CheckLocks():
1604
      if self.lm._locks:
1605
        raise utils.RetryAgain()
1606

    
1607
    utils.Retry(_CheckLocks, 0.1, 30.0)
1608

    
1609
    self.assertFalse(self.lm._locks)
1610

    
1611
  def testMultiThread(self):
1612
    locks = []
1613

    
1614
    def _CreateLock(prev, next, name):
1615
      prev.wait()
1616
      locks.append(locking.SharedLock(name, monitor=self.lm))
1617
      if next:
1618
        next.set()
1619

    
1620
    expnames = []
1621

    
1622
    first = threading.Event()
1623
    prev = first
1624

    
1625
    # Use a deterministic random generator
1626
    for i in random.Random(4263).sample(range(100), 33):
1627
      name = "MtTestLock%s" % i
1628
      expnames.append(name)
1629

    
1630
      ev = threading.Event()
1631
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1632
      prev = ev
1633

    
1634
    # Add locks
1635
    first.set()
1636
    self._waitThreads()
1637

    
1638
    # Check order in which locks were added
1639
    self.assertEqual([i.name for i in locks], expnames)
1640

    
1641
    # Sync queries are not supported
1642
    self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
1643

    
1644
    # Check query result
1645
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1646
                                        False),
1647
                     [[name, None, None, []]
1648
                      for name in utils.NiceSort(expnames)])
1649

    
1650
    # Test exclusive acquire
1651
    for tlock in locks[::4]:
1652
      tlock.acquire(shared=0)
1653
      try:
1654
        def _GetExpResult(name):
1655
          if tlock.name == name:
1656
            return [name, "exclusive", [threading.currentThread().getName()],
1657
                    []]
1658
          return [name, None, None, []]
1659

    
1660
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1661
                                             "pending"], False),
1662
                         [_GetExpResult(name)
1663
                          for name in utils.NiceSort(expnames)])
1664
      finally:
1665
        tlock.release()
1666

    
1667
    # Test shared acquire
1668
    def _Acquire(lock, shared, ev, notify):
1669
      lock.acquire(shared=shared)
1670
      try:
1671
        notify.set()
1672
        ev.wait()
1673
      finally:
1674
        lock.release()
1675

    
1676
    for tlock1 in locks[::11]:
1677
      for tlock2 in locks[::-15]:
1678
        if tlock2 == tlock1:
1679
          # Avoid deadlocks
1680
          continue
1681

    
1682
        for tlock3 in locks[::10]:
1683
          if tlock3 in (tlock2, tlock1):
1684
            # Avoid deadlocks
1685
            continue
1686

    
1687
          releaseev = threading.Event()
1688

    
1689
          # Acquire locks
1690
          acquireev = []
1691
          tthreads1 = []
1692
          for i in range(3):
1693
            ev = threading.Event()
1694
            tthreads1.append(self._addThread(target=_Acquire,
1695
                                             args=(tlock1, 1, releaseev, ev)))
1696
            acquireev.append(ev)
1697

    
1698
          ev = threading.Event()
1699
          tthread2 = self._addThread(target=_Acquire,
1700
                                     args=(tlock2, 1, releaseev, ev))
1701
          acquireev.append(ev)
1702

    
1703
          ev = threading.Event()
1704
          tthread3 = self._addThread(target=_Acquire,
1705
                                     args=(tlock3, 0, releaseev, ev))
1706
          acquireev.append(ev)
1707

    
1708
          # Wait for all locks to be acquired
1709
          for i in acquireev:
1710
            i.wait()
1711

    
1712
          # Check query result
1713
          for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
1714
                                                         "owner"], False):
1715
            if name == tlock1.name:
1716
              self.assertEqual(mode, "shared")
1717
              self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
1718
              continue
1719

    
1720
            if name == tlock2.name:
1721
              self.assertEqual(mode, "shared")
1722
              self.assertEqual(owner, [tthread2.getName()])
1723
              continue
1724

    
1725
            if name == tlock3.name:
1726
              self.assertEqual(mode, "exclusive")
1727
              self.assertEqual(owner, [tthread3.getName()])
1728
              continue
1729

    
1730
            self.assert_(name in expnames)
1731
            self.assert_(mode is None)
1732
            self.assert_(owner is None)
1733

    
1734
          # Release locks again
1735
          releaseev.set()
1736

    
1737
          self._waitThreads()
1738

    
1739
          self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1740
                           [[name, None, None]
1741
                            for name in utils.NiceSort(expnames)])
1742

    
1743
  def testDelete(self):
1744
    lock = locking.SharedLock("TestLock", monitor=self.lm)
1745

    
1746
    self.assertEqual(len(self.lm._locks), 1)
1747
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1748
                     [[lock.name, None, None]])
1749

    
1750
    lock.delete()
1751

    
1752
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1753
                     [[lock.name, "deleted", None]])
1754
    self.assertEqual(len(self.lm._locks), 1)
1755

    
1756
  def testPending(self):
1757
    def _Acquire(lock, shared, prev, next):
1758
      prev.wait()
1759

    
1760
      lock.acquire(shared=shared, test_notify=next.set)
1761
      try:
1762
        pass
1763
      finally:
1764
        lock.release()
1765

    
1766
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
1767

    
1768
    for shared in [0, 1]:
1769
      lock.acquire()
1770
      try:
1771
        self.assertEqual(len(self.lm._locks), 1)
1772
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1773
                         [[lock.name, "exclusive",
1774
                           [threading.currentThread().getName()]]])
1775

    
1776
        threads = []
1777

    
1778
        first = threading.Event()
1779
        prev = first
1780

    
1781
        for i in range(5):
1782
          ev = threading.Event()
1783
          threads.append(self._addThread(target=_Acquire,
1784
                                          args=(lock, shared, prev, ev)))
1785
          prev = ev
1786

    
1787
        # Start acquires
1788
        first.set()
1789

    
1790
        # Wait for last acquire to start waiting
1791
        prev.wait()
1792

    
1793
        # NOTE: This works only because QueryLocks will acquire the
1794
        # lock-internal lock again and won't be able to get the information
1795
        # until it has the lock. By then the acquire should be registered in
1796
        # SharedLock.__pending (otherwise it's a bug).
1797

    
1798
        # All acquires are waiting now
1799
        if shared:
1800
          pending = [("shared", sorted([t.getName() for t in threads]))]
1801
        else:
1802
          pending = [("exclusive", [t.getName()]) for t in threads]
1803

    
1804
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1805
                                             "pending"], False),
1806
                         [[lock.name, "exclusive",
1807
                           [threading.currentThread().getName()],
1808
                           pending]])
1809

    
1810
        self.assertEqual(len(self.lm._locks), 1)
1811
      finally:
1812
        lock.release()
1813

    
1814
      self._waitThreads()
1815

    
1816
      # No pending acquires
1817
      self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1818
                                          False),
1819
                       [[lock.name, None, None, []]])
1820

    
1821
      self.assertEqual(len(self.lm._locks), 1)
1822

    
1823

    
1824
if __name__ == '__main__':
1825
  testutils.GanetiTestProgram()