Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 819ca990

History | View | Annotate | Download (61.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import itertools
32

    
33
from ganeti import locking
34
from ganeti import errors
35
from ganeti import utils
36
from ganeti import compat
37

    
38
import testutils
39

    
40

    
41
# This is used to test the ssynchronize decorator.
42
# Since it's passed as input to a decorator it must be declared as a global.
43
_decoratorlock = locking.SharedLock("decorator lock")
44

    
45
#: List for looping tests
46
ITERATIONS = range(8)
47

    
48

    
49
def _Repeat(fn):
50
  """Decorator for executing a function many times"""
51
  def wrapper(*args, **kwargs):
52
    for i in ITERATIONS:
53
      fn(*args, **kwargs)
54
  return wrapper
55

    
56

    
57
def SafeSleep(duration):
58
  start = time.time()
59
  while True:
60
    delay = start + duration - time.time()
61
    if delay <= 0.0:
62
      break
63
    time.sleep(delay)
64

    
65

    
66
class _ThreadedTestCase(unittest.TestCase):
67
  """Test class that supports adding/waiting on threads"""
68
  def setUp(self):
69
    unittest.TestCase.setUp(self)
70
    self.done = Queue.Queue(0)
71
    self.threads = []
72

    
73
  def _addThread(self, *args, **kwargs):
74
    """Create and remember a new thread"""
75
    t = threading.Thread(*args, **kwargs)
76
    self.threads.append(t)
77
    t.start()
78
    return t
79

    
80
  def _waitThreads(self):
81
    """Wait for all our threads to finish"""
82
    for t in self.threads:
83
      t.join(60)
84
      self.failIf(t.isAlive())
85
    self.threads = []
86

    
87

    
88
class _ConditionTestCase(_ThreadedTestCase):
89
  """Common test case for conditions"""
90

    
91
  def setUp(self, cls):
92
    _ThreadedTestCase.setUp(self)
93
    self.lock = threading.Lock()
94
    self.cond = cls(self.lock)
95

    
96
  def _testAcquireRelease(self):
97
    self.assertFalse(self.cond._is_owned())
98
    self.assertRaises(RuntimeError, self.cond.wait)
99
    self.assertRaises(RuntimeError, self.cond.notifyAll)
100

    
101
    self.cond.acquire()
102
    self.assert_(self.cond._is_owned())
103
    self.cond.notifyAll()
104
    self.assert_(self.cond._is_owned())
105
    self.cond.release()
106

    
107
    self.assertFalse(self.cond._is_owned())
108
    self.assertRaises(RuntimeError, self.cond.wait)
109
    self.assertRaises(RuntimeError, self.cond.notifyAll)
110

    
111
  def _testNotification(self):
112
    def _NotifyAll():
113
      self.done.put("NE")
114
      self.cond.acquire()
115
      self.done.put("NA")
116
      self.cond.notifyAll()
117
      self.done.put("NN")
118
      self.cond.release()
119

    
120
    self.cond.acquire()
121
    self._addThread(target=_NotifyAll)
122
    self.assertEqual(self.done.get(True, 1), "NE")
123
    self.assertRaises(Queue.Empty, self.done.get_nowait)
124
    self.cond.wait()
125
    self.assertEqual(self.done.get(True, 1), "NA")
126
    self.assertEqual(self.done.get(True, 1), "NN")
127
    self.assert_(self.cond._is_owned())
128
    self.cond.release()
129
    self.assertFalse(self.cond._is_owned())
130

    
131

    
132
class TestSingleNotifyPipeCondition(_ConditionTestCase):
133
  """SingleNotifyPipeCondition tests"""
134

    
135
  def setUp(self):
136
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
137

    
138
  def testAcquireRelease(self):
139
    self._testAcquireRelease()
140

    
141
  def testNotification(self):
142
    self._testNotification()
143

    
144
  def testWaitReuse(self):
145
    self.cond.acquire()
146
    self.cond.wait(0)
147
    self.cond.wait(0.1)
148
    self.cond.release()
149

    
150
  def testNoNotifyReuse(self):
151
    self.cond.acquire()
152
    self.cond.notifyAll()
153
    self.assertRaises(RuntimeError, self.cond.wait)
154
    self.assertRaises(RuntimeError, self.cond.notifyAll)
155
    self.cond.release()
156

    
157

    
158
class TestPipeCondition(_ConditionTestCase):
159
  """PipeCondition tests"""
160

    
161
  def setUp(self):
162
    _ConditionTestCase.setUp(self, locking.PipeCondition)
163

    
164
  def testAcquireRelease(self):
165
    self._testAcquireRelease()
166

    
167
  def testNotification(self):
168
    self._testNotification()
169

    
170
  def _TestWait(self, fn):
171
    threads = [
172
      self._addThread(target=fn),
173
      self._addThread(target=fn),
174
      self._addThread(target=fn),
175
      ]
176

    
177
    # Wait for threads to be waiting
178
    for _ in threads:
179
      self.assertEqual(self.done.get(True, 1), "A")
180

    
181
    self.assertRaises(Queue.Empty, self.done.get_nowait)
182

    
183
    self.cond.acquire()
184
    self.assertEqual(len(self.cond._waiters), 3)
185
    self.assertEqual(self.cond._waiters, set(threads))
186
    # This new thread can't acquire the lock, and thus call wait, before we
187
    # release it
188
    self._addThread(target=fn)
189
    self.cond.notifyAll()
190
    self.assertRaises(Queue.Empty, self.done.get_nowait)
191
    self.cond.release()
192

    
193
    # We should now get 3 W and 1 A (for the new thread) in whatever order
194
    w = 0
195
    a = 0
196
    for i in range(4):
197
      got = self.done.get(True, 1)
198
      if got == "W":
199
        w += 1
200
      elif got == "A":
201
        a += 1
202
      else:
203
        self.fail("Got %s on the done queue" % got)
204

    
205
    self.assertEqual(w, 3)
206
    self.assertEqual(a, 1)
207

    
208
    self.cond.acquire()
209
    self.cond.notifyAll()
210
    self.cond.release()
211
    self._waitThreads()
212
    self.assertEqual(self.done.get_nowait(), "W")
213
    self.assertRaises(Queue.Empty, self.done.get_nowait)
214

    
215
  def testBlockingWait(self):
216
    def _BlockingWait():
217
      self.cond.acquire()
218
      self.done.put("A")
219
      self.cond.wait()
220
      self.cond.release()
221
      self.done.put("W")
222

    
223
    self._TestWait(_BlockingWait)
224

    
225
  def testLongTimeoutWait(self):
226
    def _Helper():
227
      self.cond.acquire()
228
      self.done.put("A")
229
      self.cond.wait(15.0)
230
      self.cond.release()
231
      self.done.put("W")
232

    
233
    self._TestWait(_Helper)
234

    
235
  def _TimeoutWait(self, timeout, check):
236
    self.cond.acquire()
237
    self.cond.wait(timeout)
238
    self.cond.release()
239
    self.done.put(check)
240

    
241
  def testShortTimeoutWait(self):
242
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
243
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
244
    self._waitThreads()
245
    self.assertEqual(self.done.get_nowait(), "T1")
246
    self.assertEqual(self.done.get_nowait(), "T1")
247
    self.assertRaises(Queue.Empty, self.done.get_nowait)
248

    
249
  def testZeroTimeoutWait(self):
250
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
251
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
252
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
253
    self._waitThreads()
254
    self.assertEqual(self.done.get_nowait(), "T0")
255
    self.assertEqual(self.done.get_nowait(), "T0")
256
    self.assertEqual(self.done.get_nowait(), "T0")
257
    self.assertRaises(Queue.Empty, self.done.get_nowait)
258

    
259

    
260
class TestSharedLock(_ThreadedTestCase):
261
  """SharedLock tests"""
262

    
263
  def setUp(self):
264
    _ThreadedTestCase.setUp(self)
265
    self.sl = locking.SharedLock("TestSharedLock")
266

    
267
  def testSequenceAndOwnership(self):
268
    self.assertFalse(self.sl._is_owned())
269
    self.sl.acquire(shared=1)
270
    self.assert_(self.sl._is_owned())
271
    self.assert_(self.sl._is_owned(shared=1))
272
    self.assertFalse(self.sl._is_owned(shared=0))
273
    self.sl.release()
274
    self.assertFalse(self.sl._is_owned())
275
    self.sl.acquire()
276
    self.assert_(self.sl._is_owned())
277
    self.assertFalse(self.sl._is_owned(shared=1))
278
    self.assert_(self.sl._is_owned(shared=0))
279
    self.sl.release()
280
    self.assertFalse(self.sl._is_owned())
281
    self.sl.acquire(shared=1)
282
    self.assert_(self.sl._is_owned())
283
    self.assert_(self.sl._is_owned(shared=1))
284
    self.assertFalse(self.sl._is_owned(shared=0))
285
    self.sl.release()
286
    self.assertFalse(self.sl._is_owned())
287

    
288
  def testBooleanValue(self):
289
    # semaphores are supposed to return a true value on a successful acquire
290
    self.assert_(self.sl.acquire(shared=1))
291
    self.sl.release()
292
    self.assert_(self.sl.acquire())
293
    self.sl.release()
294

    
295
  def testDoubleLockingStoE(self):
296
    self.sl.acquire(shared=1)
297
    self.assertRaises(AssertionError, self.sl.acquire)
298

    
299
  def testDoubleLockingEtoS(self):
300
    self.sl.acquire()
301
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
302

    
303
  def testDoubleLockingStoS(self):
304
    self.sl.acquire(shared=1)
305
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306

    
307
  def testDoubleLockingEtoE(self):
308
    self.sl.acquire()
309
    self.assertRaises(AssertionError, self.sl.acquire)
310

    
311
  # helper functions: called in a separate thread they acquire the lock, send
312
  # their identifier on the done queue, then release it.
313
  def _doItSharer(self):
314
    try:
315
      self.sl.acquire(shared=1)
316
      self.done.put('SHR')
317
      self.sl.release()
318
    except errors.LockError:
319
      self.done.put('ERR')
320

    
321
  def _doItExclusive(self):
322
    try:
323
      self.sl.acquire()
324
      self.done.put('EXC')
325
      self.sl.release()
326
    except errors.LockError:
327
      self.done.put('ERR')
328

    
329
  def _doItDelete(self):
330
    try:
331
      self.sl.delete()
332
      self.done.put('DEL')
333
    except errors.LockError:
334
      self.done.put('ERR')
335

    
336
  def testSharersCanCoexist(self):
337
    self.sl.acquire(shared=1)
338
    threading.Thread(target=self._doItSharer).start()
339
    self.assert_(self.done.get(True, 1))
340
    self.sl.release()
341

    
342
  @_Repeat
343
  def testExclusiveBlocksExclusive(self):
344
    self.sl.acquire()
345
    self._addThread(target=self._doItExclusive)
346
    self.assertRaises(Queue.Empty, self.done.get_nowait)
347
    self.sl.release()
348
    self._waitThreads()
349
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
350

    
351
  @_Repeat
352
  def testExclusiveBlocksDelete(self):
353
    self.sl.acquire()
354
    self._addThread(target=self._doItDelete)
355
    self.assertRaises(Queue.Empty, self.done.get_nowait)
356
    self.sl.release()
357
    self._waitThreads()
358
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
359
    self.sl = locking.SharedLock(self.sl.name)
360

    
361
  @_Repeat
362
  def testExclusiveBlocksSharer(self):
363
    self.sl.acquire()
364
    self._addThread(target=self._doItSharer)
365
    self.assertRaises(Queue.Empty, self.done.get_nowait)
366
    self.sl.release()
367
    self._waitThreads()
368
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
369

    
370
  @_Repeat
371
  def testSharerBlocksExclusive(self):
372
    self.sl.acquire(shared=1)
373
    self._addThread(target=self._doItExclusive)
374
    self.assertRaises(Queue.Empty, self.done.get_nowait)
375
    self.sl.release()
376
    self._waitThreads()
377
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
378

    
379
  @_Repeat
380
  def testSharerBlocksDelete(self):
381
    self.sl.acquire(shared=1)
382
    self._addThread(target=self._doItDelete)
383
    self.assertRaises(Queue.Empty, self.done.get_nowait)
384
    self.sl.release()
385
    self._waitThreads()
386
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
387
    self.sl = locking.SharedLock(self.sl.name)
388

    
389
  @_Repeat
390
  def testWaitingExclusiveBlocksSharer(self):
391
    """SKIPPED testWaitingExclusiveBlockSharer"""
392
    return
393

    
394
    self.sl.acquire(shared=1)
395
    # the lock is acquired in shared mode...
396
    self._addThread(target=self._doItExclusive)
397
    # ...but now an exclusive is waiting...
398
    self._addThread(target=self._doItSharer)
399
    # ...so the sharer should be blocked as well
400
    self.assertRaises(Queue.Empty, self.done.get_nowait)
401
    self.sl.release()
402
    self._waitThreads()
403
    # The exclusive passed before
404
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
405
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
406

    
407
  @_Repeat
408
  def testWaitingSharerBlocksExclusive(self):
409
    """SKIPPED testWaitingSharerBlocksExclusive"""
410
    return
411

    
412
    self.sl.acquire()
413
    # the lock is acquired in exclusive mode...
414
    self._addThread(target=self._doItSharer)
415
    # ...but now a sharer is waiting...
416
    self._addThread(target=self._doItExclusive)
417
    # ...the exclusive is waiting too...
418
    self.assertRaises(Queue.Empty, self.done.get_nowait)
419
    self.sl.release()
420
    self._waitThreads()
421
    # The sharer passed before
422
    self.assertEqual(self.done.get_nowait(), 'SHR')
423
    self.assertEqual(self.done.get_nowait(), 'EXC')
424

    
425
  def testDelete(self):
426
    self.sl.delete()
427
    self.assertRaises(errors.LockError, self.sl.acquire)
428
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
429
    self.assertRaises(errors.LockError, self.sl.delete)
430

    
431
  def testDeleteTimeout(self):
432
    self.sl.delete(timeout=60)
433

    
434
  def testNoDeleteIfSharer(self):
435
    self.sl.acquire(shared=1)
436
    self.assertRaises(AssertionError, self.sl.delete)
437

    
438
  @_Repeat
439
  def testDeletePendingSharersExclusiveDelete(self):
440
    self.sl.acquire()
441
    self._addThread(target=self._doItSharer)
442
    self._addThread(target=self._doItSharer)
443
    self._addThread(target=self._doItExclusive)
444
    self._addThread(target=self._doItDelete)
445
    self.sl.delete()
446
    self._waitThreads()
447
    # The threads who were pending return ERR
448
    for _ in range(4):
449
      self.assertEqual(self.done.get_nowait(), 'ERR')
450
    self.sl = locking.SharedLock(self.sl.name)
451

    
452
  @_Repeat
453
  def testDeletePendingDeleteExclusiveSharers(self):
454
    self.sl.acquire()
455
    self._addThread(target=self._doItDelete)
456
    self._addThread(target=self._doItExclusive)
457
    self._addThread(target=self._doItSharer)
458
    self._addThread(target=self._doItSharer)
459
    self.sl.delete()
460
    self._waitThreads()
461
    # The two threads who were pending return both ERR
462
    self.assertEqual(self.done.get_nowait(), 'ERR')
463
    self.assertEqual(self.done.get_nowait(), 'ERR')
464
    self.assertEqual(self.done.get_nowait(), 'ERR')
465
    self.assertEqual(self.done.get_nowait(), 'ERR')
466
    self.sl = locking.SharedLock(self.sl.name)
467

    
468
  @_Repeat
469
  def testExclusiveAcquireTimeout(self):
470
    for shared in [0, 1]:
471
      on_queue = threading.Event()
472
      release_exclusive = threading.Event()
473

    
474
      def _LockExclusive():
475
        self.sl.acquire(shared=0, test_notify=on_queue.set)
476
        self.done.put("A: start wait")
477
        release_exclusive.wait()
478
        self.done.put("A: end wait")
479
        self.sl.release()
480

    
481
      # Start thread to hold lock in exclusive mode
482
      self._addThread(target=_LockExclusive)
483

    
484
      # Wait for wait to begin
485
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
486

    
487
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
488
      # on the queue
489
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
490
                                      test_notify=release_exclusive.set))
491

    
492
      self.done.put("got 2nd")
493
      self.sl.release()
494

    
495
      self._waitThreads()
496

    
497
      self.assertEqual(self.done.get_nowait(), "A: end wait")
498
      self.assertEqual(self.done.get_nowait(), "got 2nd")
499
      self.assertRaises(Queue.Empty, self.done.get_nowait)
500

    
501
  @_Repeat
502
  def testAcquireExpiringTimeout(self):
503
    def _AcquireWithTimeout(shared, timeout):
504
      if not self.sl.acquire(shared=shared, timeout=timeout):
505
        self.done.put("timeout")
506

    
507
    for shared in [0, 1]:
508
      # Lock exclusively
509
      self.sl.acquire()
510

    
511
      # Start shared acquires with timeout between 0 and 20 ms
512
      for i in range(11):
513
        self._addThread(target=_AcquireWithTimeout,
514
                        args=(shared, i * 2.0 / 1000.0))
515

    
516
      # Wait for threads to finish (makes sure the acquire timeout expires
517
      # before releasing the lock)
518
      self._waitThreads()
519

    
520
      # Release lock
521
      self.sl.release()
522

    
523
      for _ in range(11):
524
        self.assertEqual(self.done.get_nowait(), "timeout")
525

    
526
      self.assertRaises(Queue.Empty, self.done.get_nowait)
527

    
528
  @_Repeat
529
  def testSharedSkipExclusiveAcquires(self):
530
    # Tests whether shared acquires jump in front of exclusive acquires in the
531
    # queue.
532

    
533
    def _Acquire(shared, name, notify_ev, wait_ev):
534
      if notify_ev:
535
        notify_fn = notify_ev.set
536
      else:
537
        notify_fn = None
538

    
539
      if wait_ev:
540
        wait_ev.wait()
541

    
542
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
543
        return
544

    
545
      self.done.put(name)
546
      self.sl.release()
547

    
548
    # Get exclusive lock while we fill the queue
549
    self.sl.acquire()
550

    
551
    shrcnt1 = 5
552
    shrcnt2 = 7
553
    shrcnt3 = 9
554
    shrcnt4 = 2
555

    
556
    # Add acquires using threading.Event for synchronization. They'll be
557
    # acquired exactly in the order defined in this list.
558
    acquires = (shrcnt1 * [(1, "shared 1")] +
559
                3 * [(0, "exclusive 1")] +
560
                shrcnt2 * [(1, "shared 2")] +
561
                shrcnt3 * [(1, "shared 3")] +
562
                shrcnt4 * [(1, "shared 4")] +
563
                3 * [(0, "exclusive 2")])
564

    
565
    ev_cur = None
566
    ev_prev = None
567

    
568
    for args in acquires:
569
      ev_cur = threading.Event()
570
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
571
      ev_prev = ev_cur
572

    
573
    # Wait for last acquire to start
574
    ev_prev.wait()
575

    
576
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
577
    # together
578
    self.assertEqual(self.sl._count_pending(), 7)
579

    
580
    # Release exclusive lock and wait
581
    self.sl.release()
582

    
583
    self._waitThreads()
584

    
585
    # Check sequence
586
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
587
      # Shared locks aren't guaranteed to be notified in order, but they'll be
588
      # first
589
      tmp = self.done.get_nowait()
590
      if tmp == "shared 1":
591
        shrcnt1 -= 1
592
      elif tmp == "shared 2":
593
        shrcnt2 -= 1
594
      elif tmp == "shared 3":
595
        shrcnt3 -= 1
596
      elif tmp == "shared 4":
597
        shrcnt4 -= 1
598
    self.assertEqual(shrcnt1, 0)
599
    self.assertEqual(shrcnt2, 0)
600
    self.assertEqual(shrcnt3, 0)
601
    self.assertEqual(shrcnt3, 0)
602

    
603
    for _ in range(3):
604
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
605

    
606
    for _ in range(3):
607
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
608

    
609
    self.assertRaises(Queue.Empty, self.done.get_nowait)
610

    
611
  @_Repeat
612
  def testMixedAcquireTimeout(self):
613
    sync = threading.Event()
614

    
615
    def _AcquireShared(ev):
616
      if not self.sl.acquire(shared=1, timeout=None):
617
        return
618

    
619
      self.done.put("shared")
620

    
621
      # Notify main thread
622
      ev.set()
623

    
624
      # Wait for notification from main thread
625
      sync.wait()
626

    
627
      # Release lock
628
      self.sl.release()
629

    
630
    acquires = []
631
    for _ in range(3):
632
      ev = threading.Event()
633
      self._addThread(target=_AcquireShared, args=(ev, ))
634
      acquires.append(ev)
635

    
636
    # Wait for all acquires to finish
637
    for i in acquires:
638
      i.wait()
639

    
640
    self.assertEqual(self.sl._count_pending(), 0)
641

    
642
    # Try to get exclusive lock
643
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
644

    
645
    # Acquire exclusive without timeout
646
    exclsync = threading.Event()
647
    exclev = threading.Event()
648

    
649
    def _AcquireExclusive():
650
      if not self.sl.acquire(shared=0):
651
        return
652

    
653
      self.done.put("exclusive")
654

    
655
      # Notify main thread
656
      exclev.set()
657

    
658
      # Wait for notification from main thread
659
      exclsync.wait()
660

    
661
      self.sl.release()
662

    
663
    self._addThread(target=_AcquireExclusive)
664

    
665
    # Try to get exclusive lock
666
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
667

    
668
    # Make all shared holders release their locks
669
    sync.set()
670

    
671
    # Wait for exclusive acquire to succeed
672
    exclev.wait()
673

    
674
    self.assertEqual(self.sl._count_pending(), 0)
675

    
676
    # Try to get exclusive lock
677
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
678

    
679
    def _AcquireSharedSimple():
680
      if self.sl.acquire(shared=1, timeout=None):
681
        self.done.put("shared2")
682
        self.sl.release()
683

    
684
    for _ in range(10):
685
      self._addThread(target=_AcquireSharedSimple)
686

    
687
    # Tell exclusive lock to release
688
    exclsync.set()
689

    
690
    # Wait for everything to finish
691
    self._waitThreads()
692

    
693
    self.assertEqual(self.sl._count_pending(), 0)
694

    
695
    # Check sequence
696
    for _ in range(3):
697
      self.assertEqual(self.done.get_nowait(), "shared")
698

    
699
    self.assertEqual(self.done.get_nowait(), "exclusive")
700

    
701
    for _ in range(10):
702
      self.assertEqual(self.done.get_nowait(), "shared2")
703

    
704
    self.assertRaises(Queue.Empty, self.done.get_nowait)
705

    
706
  def testPriority(self):
707
    # Acquire in exclusive mode
708
    self.assert_(self.sl.acquire(shared=0))
709

    
710
    # Queue acquires
711
    def _Acquire(prev, next, shared, priority, result):
712
      prev.wait()
713
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
714
      try:
715
        self.done.put(result)
716
      finally:
717
        self.sl.release()
718

    
719
    counter = itertools.count(0)
720
    priorities = range(-20, 30)
721
    first = threading.Event()
722
    prev = first
723

    
724
    # Data structure:
725
    # {
726
    #   priority:
727
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
728
    #      (shared/exclusive, ...),
729
    #      ...,
730
    #     ],
731
    # }
732
    perprio = {}
733

    
734
    # References shared acquire per priority in L{perprio}. Data structure:
735
    # {
736
    #   priority: (shared=1, set(acquire names), set(pending threads)),
737
    # }
738
    prioshared = {}
739

    
740
    for seed in [4979, 9523, 14902, 32440]:
741
      # Use a deterministic random generator
742
      rnd = random.Random(seed)
743
      for priority in [rnd.choice(priorities) for _ in range(30)]:
744
        modes = [0, 1]
745
        rnd.shuffle(modes)
746
        for shared in modes:
747
          # Unique name
748
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
749

    
750
          ev = threading.Event()
751
          thread = self._addThread(target=_Acquire,
752
                                   args=(prev, ev, shared, priority, acqname))
753
          prev = ev
754

    
755
          # Record expected aqcuire, see above for structure
756
          data = (shared, set([acqname]), set([thread]))
757
          priolist = perprio.setdefault(priority, [])
758
          if shared:
759
            priosh = prioshared.get(priority, None)
760
            if priosh:
761
              # Shared acquires are merged
762
              for i, j in zip(priosh[1:], data[1:]):
763
                i.update(j)
764
              assert data[0] == priosh[0]
765
            else:
766
              prioshared[priority] = data
767
              priolist.append(data)
768
          else:
769
            priolist.append(data)
770

    
771
    # Start all acquires and wait for them
772
    first.set()
773
    prev.wait()
774

    
775
    # Check lock information
776
    self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
777
    self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
778
                     ["exclusive", [threading.currentThread().getName()]])
779
    self.assertEqual(self.sl.GetInfo(["name", "pending"]),
780
                     [self.sl.name,
781
                      [(["exclusive", "shared"][int(bool(shared))],
782
                        sorted([t.getName() for t in threads]))
783
                       for acquires in [perprio[i]
784
                                        for i in sorted(perprio.keys())]
785
                       for (shared, _, threads) in acquires]])
786

    
787
    # Let threads acquire the lock
788
    self.sl.release()
789

    
790
    # Wait for everything to finish
791
    self._waitThreads()
792

    
793
    self.assert_(self.sl._check_empty())
794

    
795
    # Check acquires by priority
796
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
797
      for (_, names, _) in acquires:
798
        # For shared acquires, the set will contain 1..n entries. For exclusive
799
        # acquires only one.
800
        while names:
801
          names.remove(self.done.get_nowait())
802
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
803

    
804
    self.assertRaises(Queue.Empty, self.done.get_nowait)
805

    
806

    
807
class TestSharedLockInCondition(_ThreadedTestCase):
808
  """SharedLock as a condition lock tests"""
809

    
810
  def setUp(self):
811
    _ThreadedTestCase.setUp(self)
812
    self.sl = locking.SharedLock("TestSharedLockInCondition")
813
    self.setCondition()
814

    
815
  def setCondition(self):
816
    self.cond = threading.Condition(self.sl)
817

    
818
  def testKeepMode(self):
819
    self.cond.acquire(shared=1)
820
    self.assert_(self.sl._is_owned(shared=1))
821
    self.cond.wait(0)
822
    self.assert_(self.sl._is_owned(shared=1))
823
    self.cond.release()
824
    self.cond.acquire(shared=0)
825
    self.assert_(self.sl._is_owned(shared=0))
826
    self.cond.wait(0)
827
    self.assert_(self.sl._is_owned(shared=0))
828
    self.cond.release()
829

    
830

    
831
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
832
  """SharedLock as a pipe condition lock tests"""
833

    
834
  def setCondition(self):
835
    self.cond = locking.PipeCondition(self.sl)
836

    
837

    
838
class TestSSynchronizedDecorator(_ThreadedTestCase):
839
  """Shared Lock Synchronized decorator test"""
840

    
841
  def setUp(self):
842
    _ThreadedTestCase.setUp(self)
843

    
844
  @locking.ssynchronized(_decoratorlock)
845
  def _doItExclusive(self):
846
    self.assert_(_decoratorlock._is_owned())
847
    self.done.put('EXC')
848

    
849
  @locking.ssynchronized(_decoratorlock, shared=1)
850
  def _doItSharer(self):
851
    self.assert_(_decoratorlock._is_owned(shared=1))
852
    self.done.put('SHR')
853

    
854
  def testDecoratedFunctions(self):
855
    self._doItExclusive()
856
    self.assertFalse(_decoratorlock._is_owned())
857
    self._doItSharer()
858
    self.assertFalse(_decoratorlock._is_owned())
859

    
860
  def testSharersCanCoexist(self):
861
    _decoratorlock.acquire(shared=1)
862
    threading.Thread(target=self._doItSharer).start()
863
    self.assert_(self.done.get(True, 1))
864
    _decoratorlock.release()
865

    
866
  @_Repeat
867
  def testExclusiveBlocksExclusive(self):
868
    _decoratorlock.acquire()
869
    self._addThread(target=self._doItExclusive)
870
    # give it a bit of time to check that it's not actually doing anything
871
    self.assertRaises(Queue.Empty, self.done.get_nowait)
872
    _decoratorlock.release()
873
    self._waitThreads()
874
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
875

    
876
  @_Repeat
877
  def testExclusiveBlocksSharer(self):
878
    _decoratorlock.acquire()
879
    self._addThread(target=self._doItSharer)
880
    self.assertRaises(Queue.Empty, self.done.get_nowait)
881
    _decoratorlock.release()
882
    self._waitThreads()
883
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
884

    
885
  @_Repeat
886
  def testSharerBlocksExclusive(self):
887
    _decoratorlock.acquire(shared=1)
888
    self._addThread(target=self._doItExclusive)
889
    self.assertRaises(Queue.Empty, self.done.get_nowait)
890
    _decoratorlock.release()
891
    self._waitThreads()
892
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
893

    
894

    
895
class TestLockSet(_ThreadedTestCase):
896
  """LockSet tests"""
897

    
898
  def setUp(self):
899
    _ThreadedTestCase.setUp(self)
900
    self._setUpLS()
901

    
902
  def _setUpLS(self):
903
    """Helper to (re)initialize the lock set"""
904
    self.resources = ['one', 'two', 'three']
905
    self.ls = locking.LockSet(self.resources, "TestLockSet")
906

    
907
  def testResources(self):
908
    self.assertEquals(self.ls._names(), set(self.resources))
909
    newls = locking.LockSet([], "TestLockSet.testResources")
910
    self.assertEquals(newls._names(), set())
911

    
912
  def testAcquireRelease(self):
913
    self.assert_(self.ls.acquire('one'))
914
    self.assertEquals(self.ls._list_owned(), set(['one']))
915
    self.ls.release()
916
    self.assertEquals(self.ls._list_owned(), set())
917
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
918
    self.assertEquals(self.ls._list_owned(), set(['one']))
919
    self.ls.release()
920
    self.assertEquals(self.ls._list_owned(), set())
921
    self.ls.acquire(['one', 'two', 'three'])
922
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
923
    self.ls.release('one')
924
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
925
    self.ls.release(['three'])
926
    self.assertEquals(self.ls._list_owned(), set(['two']))
927
    self.ls.release()
928
    self.assertEquals(self.ls._list_owned(), set())
929
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
930
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
931
    self.ls.release()
932
    self.assertEquals(self.ls._list_owned(), set())
933

    
934
  def testNoDoubleAcquire(self):
935
    self.ls.acquire('one')
936
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
937
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
938
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
939
    self.ls.release()
940
    self.ls.acquire(['one', 'three'])
941
    self.ls.release('one')
942
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
943
    self.ls.release('three')
944

    
945
  def testNoWrongRelease(self):
946
    self.assertRaises(AssertionError, self.ls.release)
947
    self.ls.acquire('one')
948
    self.assertRaises(AssertionError, self.ls.release, 'two')
949

    
950
  def testAddRemove(self):
951
    self.ls.add('four')
952
    self.assertEquals(self.ls._list_owned(), set())
953
    self.assert_('four' in self.ls._names())
954
    self.ls.add(['five', 'six', 'seven'], acquired=1)
955
    self.assert_('five' in self.ls._names())
956
    self.assert_('six' in self.ls._names())
957
    self.assert_('seven' in self.ls._names())
958
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
959
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
960
    self.assert_('five' not in self.ls._names())
961
    self.assert_('six' not in self.ls._names())
962
    self.assertEquals(self.ls._list_owned(), set(['seven']))
963
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
964
    self.ls.remove('seven')
965
    self.assert_('seven' not in self.ls._names())
966
    self.assertEquals(self.ls._list_owned(), set([]))
967
    self.ls.acquire(None, shared=1)
968
    self.assertRaises(AssertionError, self.ls.add, 'eight')
969
    self.ls.release()
970
    self.ls.acquire(None)
971
    self.ls.add('eight', acquired=1)
972
    self.assert_('eight' in self.ls._names())
973
    self.assert_('eight' in self.ls._list_owned())
974
    self.ls.add('nine')
975
    self.assert_('nine' in self.ls._names())
976
    self.assert_('nine' not in self.ls._list_owned())
977
    self.ls.release()
978
    self.ls.remove(['two'])
979
    self.assert_('two' not in self.ls._names())
980
    self.ls.acquire('three')
981
    self.assertEquals(self.ls.remove(['three']), ['three'])
982
    self.assert_('three' not in self.ls._names())
983
    self.assertEquals(self.ls.remove('three'), [])
984
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
985
    self.assert_('one' not in self.ls._names())
986

    
987
  def testRemoveNonBlocking(self):
988
    self.ls.acquire('one')
989
    self.assertEquals(self.ls.remove('one'), ['one'])
990
    self.ls.acquire(['two', 'three'])
991
    self.assertEquals(self.ls.remove(['two', 'three']),
992
                      ['two', 'three'])
993

    
994
  def testNoDoubleAdd(self):
995
    self.assertRaises(errors.LockError, self.ls.add, 'two')
996
    self.ls.add('four')
997
    self.assertRaises(errors.LockError, self.ls.add, 'four')
998

    
999
  def testNoWrongRemoves(self):
1000
    self.ls.acquire(['one', 'three'], shared=1)
1001
    # Cannot remove 'two' while holding something which is not a superset
1002
    self.assertRaises(AssertionError, self.ls.remove, 'two')
1003
    # Cannot remove 'three' as we are sharing it
1004
    self.assertRaises(AssertionError, self.ls.remove, 'three')
1005

    
1006
  def testAcquireSetLock(self):
1007
    # acquire the set-lock exclusively
1008
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1009
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1010
    self.assertEquals(self.ls._is_owned(), True)
1011
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1012
    # I can still add/remove elements...
1013
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1014
    self.assert_(self.ls.add('six'))
1015
    self.ls.release()
1016
    # share the set-lock
1017
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1018
    # adding new elements is not possible
1019
    self.assertRaises(AssertionError, self.ls.add, 'five')
1020
    self.ls.release()
1021

    
1022
  def testAcquireWithRepetitions(self):
1023
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1024
                      set(['two', 'two', 'three']))
1025
    self.ls.release(['two', 'two'])
1026
    self.assertEquals(self.ls._list_owned(), set(['three']))
1027

    
1028
  def testEmptyAcquire(self):
1029
    # Acquire an empty list of locks...
1030
    self.assertEquals(self.ls.acquire([]), set())
1031
    self.assertEquals(self.ls._list_owned(), set())
1032
    # New locks can still be addded
1033
    self.assert_(self.ls.add('six'))
1034
    # "re-acquiring" is not an issue, since we had really acquired nothing
1035
    self.assertEquals(self.ls.acquire([], shared=1), set())
1036
    self.assertEquals(self.ls._list_owned(), set())
1037
    # We haven't really acquired anything, so we cannot release
1038
    self.assertRaises(AssertionError, self.ls.release)
1039

    
1040
  def _doLockSet(self, names, shared):
1041
    try:
1042
      self.ls.acquire(names, shared=shared)
1043
      self.done.put('DONE')
1044
      self.ls.release()
1045
    except errors.LockError:
1046
      self.done.put('ERR')
1047

    
1048
  def _doAddSet(self, names):
1049
    try:
1050
      self.ls.add(names, acquired=1)
1051
      self.done.put('DONE')
1052
      self.ls.release()
1053
    except errors.LockError:
1054
      self.done.put('ERR')
1055

    
1056
  def _doRemoveSet(self, names):
1057
    self.done.put(self.ls.remove(names))
1058

    
1059
  @_Repeat
1060
  def testConcurrentSharedAcquire(self):
1061
    self.ls.acquire(['one', 'two'], shared=1)
1062
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1063
    self._waitThreads()
1064
    self.assertEqual(self.done.get_nowait(), 'DONE')
1065
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1066
    self._waitThreads()
1067
    self.assertEqual(self.done.get_nowait(), 'DONE')
1068
    self._addThread(target=self._doLockSet, args=('three', 1))
1069
    self._waitThreads()
1070
    self.assertEqual(self.done.get_nowait(), 'DONE')
1071
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1072
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1073
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1074
    self.ls.release()
1075
    self._waitThreads()
1076
    self.assertEqual(self.done.get_nowait(), 'DONE')
1077
    self.assertEqual(self.done.get_nowait(), 'DONE')
1078

    
1079
  @_Repeat
1080
  def testConcurrentExclusiveAcquire(self):
1081
    self.ls.acquire(['one', 'two'])
1082
    self._addThread(target=self._doLockSet, args=('three', 1))
1083
    self._waitThreads()
1084
    self.assertEqual(self.done.get_nowait(), 'DONE')
1085
    self._addThread(target=self._doLockSet, args=('three', 0))
1086
    self._waitThreads()
1087
    self.assertEqual(self.done.get_nowait(), 'DONE')
1088
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1089
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1090
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1091
    self._addThread(target=self._doLockSet, args=('one', 0))
1092
    self._addThread(target=self._doLockSet, args=('one', 1))
1093
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1094
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1095
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1096
    self.ls.release()
1097
    self._waitThreads()
1098
    for _ in range(6):
1099
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1100

    
1101
  @_Repeat
1102
  def testSimpleAcquireTimeoutExpiring(self):
1103
    names = sorted(self.ls._names())
1104
    self.assert_(len(names) >= 3)
1105

    
1106
    # Get name of first lock
1107
    first = names[0]
1108

    
1109
    # Get name of last lock
1110
    last = names.pop()
1111

    
1112
    checks = [
1113
      # Block first and try to lock it again
1114
      (first, first),
1115

    
1116
      # Block last and try to lock all locks
1117
      (None, first),
1118

    
1119
      # Block last and try to lock it again
1120
      (last, last),
1121
      ]
1122

    
1123
    for (wanted, block) in checks:
1124
      # Lock in exclusive mode
1125
      self.assert_(self.ls.acquire(block, shared=0))
1126

    
1127
      def _AcquireOne():
1128
        # Try to get the same lock again with a timeout (should never succeed)
1129
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1130
        if acquired:
1131
          self.done.put("acquired")
1132
          self.ls.release()
1133
        else:
1134
          self.assert_(acquired is None)
1135
          self.assertFalse(self.ls._list_owned())
1136
          self.assertFalse(self.ls._is_owned())
1137
          self.done.put("not acquired")
1138

    
1139
      self._addThread(target=_AcquireOne)
1140

    
1141
      # Wait for timeout in thread to expire
1142
      self._waitThreads()
1143

    
1144
      # Release exclusive lock again
1145
      self.ls.release()
1146

    
1147
      self.assertEqual(self.done.get_nowait(), "not acquired")
1148
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1149

    
1150
  @_Repeat
1151
  def testDelayedAndExpiringLockAcquire(self):
1152
    self._setUpLS()
1153
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1154

    
1155
    for expire in (False, True):
1156
      names = sorted(self.ls._names())
1157
      self.assertEqual(len(names), 8)
1158

    
1159
      lock_ev = dict([(i, threading.Event()) for i in names])
1160

    
1161
      # Lock all in exclusive mode
1162
      self.assert_(self.ls.acquire(names, shared=0))
1163

    
1164
      if expire:
1165
        # We'll wait at least 300ms per lock
1166
        lockwait = len(names) * [0.3]
1167

    
1168
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1169
        # this gives us up to 2.4s to fail.
1170
        lockall_timeout = 0.4
1171
      else:
1172
        # This should finish rather quickly
1173
        lockwait = None
1174
        lockall_timeout = len(names) * 5.0
1175

    
1176
      def _LockAll():
1177
        def acquire_notification(name):
1178
          if not expire:
1179
            self.done.put("getting %s" % name)
1180

    
1181
          # Kick next lock
1182
          lock_ev[name].set()
1183

    
1184
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1185
                           test_notify=acquire_notification):
1186
          self.done.put("got all")
1187
          self.ls.release()
1188
        else:
1189
          self.done.put("timeout on all")
1190

    
1191
        # Notify all locks
1192
        for ev in lock_ev.values():
1193
          ev.set()
1194

    
1195
      t = self._addThread(target=_LockAll)
1196

    
1197
      for idx, name in enumerate(names):
1198
        # Wait for actual acquire on this lock to start
1199
        lock_ev[name].wait(10.0)
1200

    
1201
        if expire and t.isAlive():
1202
          # Wait some time after getting the notification to make sure the lock
1203
          # acquire will expire
1204
          SafeSleep(lockwait[idx])
1205

    
1206
        self.ls.release(names=name)
1207

    
1208
      self.assertFalse(self.ls._list_owned())
1209

    
1210
      self._waitThreads()
1211

    
1212
      if expire:
1213
        # Not checking which locks were actually acquired. Doing so would be
1214
        # too timing-dependant.
1215
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1216
      else:
1217
        for i in names:
1218
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1219
        self.assertEqual(self.done.get_nowait(), "got all")
1220
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1221

    
1222
  @_Repeat
1223
  def testConcurrentRemove(self):
1224
    self.ls.add('four')
1225
    self.ls.acquire(['one', 'two', 'four'])
1226
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1227
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1228
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1229
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1230
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1231
    self.ls.remove('one')
1232
    self.ls.release()
1233
    self._waitThreads()
1234
    for i in range(4):
1235
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1236
    self.ls.add(['five', 'six'], acquired=1)
1237
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1238
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1239
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1240
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1241
    self.ls.remove('five')
1242
    self.ls.release()
1243
    self._waitThreads()
1244
    for i in range(4):
1245
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1246
    self.ls.acquire(['three', 'four'])
1247
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1248
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1249
    self.ls.remove('four')
1250
    self._waitThreads()
1251
    self.assertEqual(self.done.get_nowait(), ['six'])
1252
    self._addThread(target=self._doRemoveSet, args=(['two']))
1253
    self._waitThreads()
1254
    self.assertEqual(self.done.get_nowait(), ['two'])
1255
    self.ls.release()
1256
    # reset lockset
1257
    self._setUpLS()
1258

    
1259
  @_Repeat
1260
  def testConcurrentSharedSetLock(self):
1261
    # share the set-lock...
1262
    self.ls.acquire(None, shared=1)
1263
    # ...another thread can share it too
1264
    self._addThread(target=self._doLockSet, args=(None, 1))
1265
    self._waitThreads()
1266
    self.assertEqual(self.done.get_nowait(), 'DONE')
1267
    # ...or just share some elements
1268
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1269
    self._waitThreads()
1270
    self.assertEqual(self.done.get_nowait(), 'DONE')
1271
    # ...but not add new ones or remove any
1272
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1273
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1274
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1275
    # this just releases the set-lock
1276
    self.ls.release([])
1277
    t.join(60)
1278
    self.assertEqual(self.done.get_nowait(), 'DONE')
1279
    # release the lock on the actual elements so remove() can proceed too
1280
    self.ls.release()
1281
    self._waitThreads()
1282
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1283
    # reset lockset
1284
    self._setUpLS()
1285

    
1286
  @_Repeat
1287
  def testConcurrentExclusiveSetLock(self):
1288
    # acquire the set-lock...
1289
    self.ls.acquire(None, shared=0)
1290
    # ...no one can do anything else
1291
    self._addThread(target=self._doLockSet, args=(None, 1))
1292
    self._addThread(target=self._doLockSet, args=(None, 0))
1293
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1294
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1295
    self._addThread(target=self._doAddSet, args=(['nine']))
1296
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1297
    self.ls.release()
1298
    self._waitThreads()
1299
    for _ in range(5):
1300
      self.assertEqual(self.done.get(True, 1), 'DONE')
1301
    # cleanup
1302
    self._setUpLS()
1303

    
1304
  @_Repeat
1305
  def testConcurrentSetLockAdd(self):
1306
    self.ls.acquire('one')
1307
    # Another thread wants the whole SetLock
1308
    self._addThread(target=self._doLockSet, args=(None, 0))
1309
    self._addThread(target=self._doLockSet, args=(None, 1))
1310
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1311
    self.assertRaises(AssertionError, self.ls.add, 'four')
1312
    self.ls.release()
1313
    self._waitThreads()
1314
    self.assertEqual(self.done.get_nowait(), 'DONE')
1315
    self.assertEqual(self.done.get_nowait(), 'DONE')
1316
    self.ls.acquire(None)
1317
    self._addThread(target=self._doLockSet, args=(None, 0))
1318
    self._addThread(target=self._doLockSet, args=(None, 1))
1319
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1320
    self.ls.add('four')
1321
    self.ls.add('five', acquired=1)
1322
    self.ls.add('six', acquired=1, shared=1)
1323
    self.assertEquals(self.ls._list_owned(),
1324
      set(['one', 'two', 'three', 'five', 'six']))
1325
    self.assertEquals(self.ls._is_owned(), True)
1326
    self.assertEquals(self.ls._names(),
1327
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1328
    self.ls.release()
1329
    self._waitThreads()
1330
    self.assertEqual(self.done.get_nowait(), 'DONE')
1331
    self.assertEqual(self.done.get_nowait(), 'DONE')
1332
    self._setUpLS()
1333

    
1334
  @_Repeat
1335
  def testEmptyLockSet(self):
1336
    # get the set-lock
1337
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1338
    # now empty it...
1339
    self.ls.remove(['one', 'two', 'three'])
1340
    # and adds/locks by another thread still wait
1341
    self._addThread(target=self._doAddSet, args=(['nine']))
1342
    self._addThread(target=self._doLockSet, args=(None, 1))
1343
    self._addThread(target=self._doLockSet, args=(None, 0))
1344
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1345
    self.ls.release()
1346
    self._waitThreads()
1347
    for _ in range(3):
1348
      self.assertEqual(self.done.get_nowait(), 'DONE')
1349
    # empty it again...
1350
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1351
    # now share it...
1352
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1353
    # other sharers can go, adds still wait
1354
    self._addThread(target=self._doLockSet, args=(None, 1))
1355
    self._waitThreads()
1356
    self.assertEqual(self.done.get_nowait(), 'DONE')
1357
    self._addThread(target=self._doAddSet, args=(['nine']))
1358
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1359
    self.ls.release()
1360
    self._waitThreads()
1361
    self.assertEqual(self.done.get_nowait(), 'DONE')
1362
    self._setUpLS()
1363

    
1364
  def testPriority(self):
1365
    def _Acquire(prev, next, name, priority, success_fn):
1366
      prev.wait()
1367
      self.assert_(self.ls.acquire(name, shared=0,
1368
                                   priority=priority,
1369
                                   test_notify=lambda _: next.set()))
1370
      try:
1371
        success_fn()
1372
      finally:
1373
        self.ls.release()
1374

    
1375
    # Get all in exclusive mode
1376
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1377

    
1378
    done_two = Queue.Queue(0)
1379

    
1380
    first = threading.Event()
1381
    prev = first
1382

    
1383
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1384
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1385

    
1386
    # Use a deterministic random generator
1387
    random.Random(741).shuffle(acquires)
1388

    
1389
    for (name, prio, done) in acquires:
1390
      ev = threading.Event()
1391
      self._addThread(target=_Acquire,
1392
                      args=(prev, ev, name, prio,
1393
                            compat.partial(done.put, "Prio%s" % prio)))
1394
      prev = ev
1395

    
1396
    # Start acquires
1397
    first.set()
1398

    
1399
    # Wait for last acquire to start
1400
    prev.wait()
1401

    
1402
    # Let threads acquire locks
1403
    self.ls.release()
1404

    
1405
    # Wait for threads to finish
1406
    self._waitThreads()
1407

    
1408
    for i in range(1, 33):
1409
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1410
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1411

    
1412
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1413
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1414

    
1415

    
1416
class TestGanetiLockManager(_ThreadedTestCase):
1417

    
1418
  def setUp(self):
1419
    _ThreadedTestCase.setUp(self)
1420
    self.nodes=['n1', 'n2']
1421
    self.nodegroups=['g1', 'g2']
1422
    self.instances=['i1', 'i2', 'i3']
1423
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1424
                                        self.instances)
1425

    
1426
  def tearDown(self):
1427
    # Don't try this at home...
1428
    locking.GanetiLockManager._instance = None
1429

    
1430
  def testLockingConstants(self):
1431
    # The locking library internally cheats by assuming its constants have some
1432
    # relationships with each other. Check those hold true.
1433
    # This relationship is also used in the Processor to recursively acquire
1434
    # the right locks. Again, please don't break it.
1435
    for i in range(len(locking.LEVELS)):
1436
      self.assertEqual(i, locking.LEVELS[i])
1437

    
1438
  def testDoubleGLFails(self):
1439
    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1440

    
1441
  def testLockNames(self):
1442
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1443
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1444
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1445
                     set(self.nodegroups))
1446
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1447
                     set(self.instances))
1448

    
1449
  def testInitAndResources(self):
1450
    locking.GanetiLockManager._instance = None
1451
    self.GL = locking.GanetiLockManager([], [], [])
1452
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1453
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1454
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1455
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1456

    
1457
    locking.GanetiLockManager._instance = None
1458
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1459
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1460
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1461
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1462
                                    set(self.nodegroups))
1463
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1464

    
1465
    locking.GanetiLockManager._instance = None
1466
    self.GL = locking.GanetiLockManager([], [], self.instances)
1467
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1468
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1469
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1470
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1471
                     set(self.instances))
1472

    
1473
  def testAcquireRelease(self):
1474
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1475
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1476
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1477
    self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1478
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1479
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1480
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1481
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1482
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1483
    self.GL.release(locking.LEVEL_NODE)
1484
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1485
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1486
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1487
    self.GL.release(locking.LEVEL_NODEGROUP)
1488
    self.GL.release(locking.LEVEL_INSTANCE)
1489
    self.assertRaises(errors.LockError, self.GL.acquire,
1490
                      locking.LEVEL_INSTANCE, ['i5'])
1491
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1492
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1493

    
1494
  def testAcquireWholeSets(self):
1495
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1496
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1497
                      set(self.instances))
1498
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1499
                      set(self.instances))
1500
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1501
                      set(self.nodegroups))
1502
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1503
                      set(self.nodegroups))
1504
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1505
                      set(self.nodes))
1506
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1507
                      set(self.nodes))
1508
    self.GL.release(locking.LEVEL_NODE)
1509
    self.GL.release(locking.LEVEL_NODEGROUP)
1510
    self.GL.release(locking.LEVEL_INSTANCE)
1511
    self.GL.release(locking.LEVEL_CLUSTER)
1512

    
1513
  def testAcquireWholeAndPartial(self):
1514
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1515
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1516
                      set(self.instances))
1517
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1518
                      set(self.instances))
1519
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1520
                      set(['n2']))
1521
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1522
                      set(['n2']))
1523
    self.GL.release(locking.LEVEL_NODE)
1524
    self.GL.release(locking.LEVEL_INSTANCE)
1525
    self.GL.release(locking.LEVEL_CLUSTER)
1526

    
1527
  def testBGLDependency(self):
1528
    self.assertRaises(AssertionError, self.GL.acquire,
1529
                      locking.LEVEL_NODE, ['n1', 'n2'])
1530
    self.assertRaises(AssertionError, self.GL.acquire,
1531
                      locking.LEVEL_INSTANCE, ['i3'])
1532
    self.assertRaises(AssertionError, self.GL.acquire,
1533
                      locking.LEVEL_NODEGROUP, ['g1'])
1534
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1535
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1536
    self.assertRaises(AssertionError, self.GL.release,
1537
                      locking.LEVEL_CLUSTER, ['BGL'])
1538
    self.assertRaises(AssertionError, self.GL.release,
1539
                      locking.LEVEL_CLUSTER)
1540
    self.GL.release(locking.LEVEL_NODE)
1541
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1542
    self.assertRaises(AssertionError, self.GL.release,
1543
                      locking.LEVEL_CLUSTER, ['BGL'])
1544
    self.assertRaises(AssertionError, self.GL.release,
1545
                      locking.LEVEL_CLUSTER)
1546
    self.GL.release(locking.LEVEL_INSTANCE)
1547
    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1548
    self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1549
    self.assertRaises(AssertionError, self.GL.release,
1550
                      locking.LEVEL_CLUSTER, ['BGL'])
1551
    self.assertRaises(AssertionError, self.GL.release,
1552
                      locking.LEVEL_CLUSTER)
1553
    self.GL.release(locking.LEVEL_NODEGROUP)
1554
    self.GL.release(locking.LEVEL_CLUSTER)
1555

    
1556
  def testWrongOrder(self):
1557
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1558
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1559
    self.assertRaises(AssertionError, self.GL.acquire,
1560
                      locking.LEVEL_NODE, ['n1'])
1561
    self.assertRaises(AssertionError, self.GL.acquire,
1562
                      locking.LEVEL_NODEGROUP, ['g1'])
1563
    self.assertRaises(AssertionError, self.GL.acquire,
1564
                      locking.LEVEL_INSTANCE, ['i2'])
1565

    
1566
  def testModifiableLevels(self):
1567
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1568
                      ['BGL2'])
1569
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1570
    self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1571
    self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1572
    self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1573
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1574
    self.GL.add(locking.LEVEL_NODE, ['n3'])
1575
    self.GL.remove(locking.LEVEL_NODE, ['n1'])
1576
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1577
    self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1578
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1579
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1580
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1581
    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1582
                      ['BGL2'])
1583

    
1584
  # Helper function to run as a thread that shared the BGL and then acquires
1585
  # some locks at another level.
1586
  def _doLock(self, level, names, shared):
1587
    try:
1588
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1589
      self.GL.acquire(level, names, shared=shared)
1590
      self.done.put('DONE')
1591
      self.GL.release(level)
1592
      self.GL.release(locking.LEVEL_CLUSTER)
1593
    except errors.LockError:
1594
      self.done.put('ERR')
1595

    
1596
  @_Repeat
1597
  def testConcurrency(self):
1598
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1599
    self._addThread(target=self._doLock,
1600
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1601
    self._waitThreads()
1602
    self.assertEqual(self.done.get_nowait(), 'DONE')
1603
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1604
    self._addThread(target=self._doLock,
1605
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1606
    self._waitThreads()
1607
    self.assertEqual(self.done.get_nowait(), 'DONE')
1608
    self._addThread(target=self._doLock,
1609
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1610
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1611
    self.GL.release(locking.LEVEL_INSTANCE)
1612
    self._waitThreads()
1613
    self.assertEqual(self.done.get_nowait(), 'DONE')
1614
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1615
    self._addThread(target=self._doLock,
1616
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1617
    self._waitThreads()
1618
    self.assertEqual(self.done.get_nowait(), 'DONE')
1619
    self._addThread(target=self._doLock,
1620
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1621
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1622
    self.GL.release(locking.LEVEL_INSTANCE)
1623
    self._waitThreads()
1624
    self.assertEqual(self.done.get(True, 1), 'DONE')
1625
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1626

    
1627

    
1628
class TestLockMonitor(_ThreadedTestCase):
1629
  def setUp(self):
1630
    _ThreadedTestCase.setUp(self)
1631
    self.lm = locking.LockMonitor()
1632

    
1633
  def testSingleThread(self):
1634
    locks = []
1635

    
1636
    for i in range(100):
1637
      name = "TestLock%s" % i
1638
      locks.append(locking.SharedLock(name, monitor=self.lm))
1639

    
1640
    self.assertEqual(len(self.lm._locks), len(locks))
1641

    
1642
    self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
1643
                     100)
1644

    
1645
    # Delete all locks
1646
    del locks[:]
1647

    
1648
    # The garbage collector might needs some time
1649
    def _CheckLocks():
1650
      if self.lm._locks:
1651
        raise utils.RetryAgain()
1652

    
1653
    utils.Retry(_CheckLocks, 0.1, 30.0)
1654

    
1655
    self.assertFalse(self.lm._locks)
1656

    
1657
  def testMultiThread(self):
1658
    locks = []
1659

    
1660
    def _CreateLock(prev, next, name):
1661
      prev.wait()
1662
      locks.append(locking.SharedLock(name, monitor=self.lm))
1663
      if next:
1664
        next.set()
1665

    
1666
    expnames = []
1667

    
1668
    first = threading.Event()
1669
    prev = first
1670

    
1671
    # Use a deterministic random generator
1672
    for i in random.Random(4263).sample(range(100), 33):
1673
      name = "MtTestLock%s" % i
1674
      expnames.append(name)
1675

    
1676
      ev = threading.Event()
1677
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1678
      prev = ev
1679

    
1680
    # Add locks
1681
    first.set()
1682
    self._waitThreads()
1683

    
1684
    # Check order in which locks were added
1685
    self.assertEqual([i.name for i in locks], expnames)
1686

    
1687
    # Sync queries are not supported
1688
    self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
1689

    
1690
    # Check query result
1691
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1692
                                        False),
1693
                     [[name, None, None, []]
1694
                      for name in utils.NiceSort(expnames)])
1695

    
1696
    # Test exclusive acquire
1697
    for tlock in locks[::4]:
1698
      tlock.acquire(shared=0)
1699
      try:
1700
        def _GetExpResult(name):
1701
          if tlock.name == name:
1702
            return [name, "exclusive", [threading.currentThread().getName()],
1703
                    []]
1704
          return [name, None, None, []]
1705

    
1706
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1707
                                             "pending"], False),
1708
                         [_GetExpResult(name)
1709
                          for name in utils.NiceSort(expnames)])
1710
      finally:
1711
        tlock.release()
1712

    
1713
    # Test shared acquire
1714
    def _Acquire(lock, shared, ev, notify):
1715
      lock.acquire(shared=shared)
1716
      try:
1717
        notify.set()
1718
        ev.wait()
1719
      finally:
1720
        lock.release()
1721

    
1722
    for tlock1 in locks[::11]:
1723
      for tlock2 in locks[::-15]:
1724
        if tlock2 == tlock1:
1725
          # Avoid deadlocks
1726
          continue
1727

    
1728
        for tlock3 in locks[::10]:
1729
          if tlock3 in (tlock2, tlock1):
1730
            # Avoid deadlocks
1731
            continue
1732

    
1733
          releaseev = threading.Event()
1734

    
1735
          # Acquire locks
1736
          acquireev = []
1737
          tthreads1 = []
1738
          for i in range(3):
1739
            ev = threading.Event()
1740
            tthreads1.append(self._addThread(target=_Acquire,
1741
                                             args=(tlock1, 1, releaseev, ev)))
1742
            acquireev.append(ev)
1743

    
1744
          ev = threading.Event()
1745
          tthread2 = self._addThread(target=_Acquire,
1746
                                     args=(tlock2, 1, releaseev, ev))
1747
          acquireev.append(ev)
1748

    
1749
          ev = threading.Event()
1750
          tthread3 = self._addThread(target=_Acquire,
1751
                                     args=(tlock3, 0, releaseev, ev))
1752
          acquireev.append(ev)
1753

    
1754
          # Wait for all locks to be acquired
1755
          for i in acquireev:
1756
            i.wait()
1757

    
1758
          # Check query result
1759
          for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
1760
                                                         "owner"], False):
1761
            if name == tlock1.name:
1762
              self.assertEqual(mode, "shared")
1763
              self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
1764
              continue
1765

    
1766
            if name == tlock2.name:
1767
              self.assertEqual(mode, "shared")
1768
              self.assertEqual(owner, [tthread2.getName()])
1769
              continue
1770

    
1771
            if name == tlock3.name:
1772
              self.assertEqual(mode, "exclusive")
1773
              self.assertEqual(owner, [tthread3.getName()])
1774
              continue
1775

    
1776
            self.assert_(name in expnames)
1777
            self.assert_(mode is None)
1778
            self.assert_(owner is None)
1779

    
1780
          # Release locks again
1781
          releaseev.set()
1782

    
1783
          self._waitThreads()
1784

    
1785
          self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1786
                           [[name, None, None]
1787
                            for name in utils.NiceSort(expnames)])
1788

    
1789
  def testDelete(self):
1790
    lock = locking.SharedLock("TestLock", monitor=self.lm)
1791

    
1792
    self.assertEqual(len(self.lm._locks), 1)
1793
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1794
                     [[lock.name, None, None]])
1795

    
1796
    lock.delete()
1797

    
1798
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1799
                     [[lock.name, "deleted", None]])
1800
    self.assertEqual(len(self.lm._locks), 1)
1801

    
1802
  def testPending(self):
1803
    def _Acquire(lock, shared, prev, next):
1804
      prev.wait()
1805

    
1806
      lock.acquire(shared=shared, test_notify=next.set)
1807
      try:
1808
        pass
1809
      finally:
1810
        lock.release()
1811

    
1812
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
1813

    
1814
    for shared in [0, 1]:
1815
      lock.acquire()
1816
      try:
1817
        self.assertEqual(len(self.lm._locks), 1)
1818
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1819
                         [[lock.name, "exclusive",
1820
                           [threading.currentThread().getName()]]])
1821

    
1822
        threads = []
1823

    
1824
        first = threading.Event()
1825
        prev = first
1826

    
1827
        for i in range(5):
1828
          ev = threading.Event()
1829
          threads.append(self._addThread(target=_Acquire,
1830
                                          args=(lock, shared, prev, ev)))
1831
          prev = ev
1832

    
1833
        # Start acquires
1834
        first.set()
1835

    
1836
        # Wait for last acquire to start waiting
1837
        prev.wait()
1838

    
1839
        # NOTE: This works only because QueryLocks will acquire the
1840
        # lock-internal lock again and won't be able to get the information
1841
        # until it has the lock. By then the acquire should be registered in
1842
        # SharedLock.__pending (otherwise it's a bug).
1843

    
1844
        # All acquires are waiting now
1845
        if shared:
1846
          pending = [("shared", sorted([t.getName() for t in threads]))]
1847
        else:
1848
          pending = [("exclusive", [t.getName()]) for t in threads]
1849

    
1850
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1851
                                             "pending"], False),
1852
                         [[lock.name, "exclusive",
1853
                           [threading.currentThread().getName()],
1854
                           pending]])
1855

    
1856
        self.assertEqual(len(self.lm._locks), 1)
1857
      finally:
1858
        lock.release()
1859

    
1860
      self._waitThreads()
1861

    
1862
      # No pending acquires
1863
      self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1864
                                          False),
1865
                       [[lock.name, None, None, []]])
1866

    
1867
      self.assertEqual(len(self.lm._locks), 1)
1868

    
1869

    
1870
if __name__ == '__main__':
1871
  testutils.GanetiTestProgram()