Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ a95c53ea

History | View | Annotate | Download (92 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import gc
32
import itertools
33

    
34
from ganeti import constants
35
from ganeti import locking
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import compat
39
from ganeti import objects
40
from ganeti import query
41

    
42
import testutils
43

    
44

    
45
# This is used to test the ssynchronize decorator.
46
# Since it's passed as input to a decorator it must be declared as a global.
47
_decoratorlock = locking.SharedLock("decorator lock")
48

    
49
#: List for looping tests
50
ITERATIONS = range(8)
51

    
52

    
53
def _Repeat(fn):
54
  """Decorator for executing a function many times"""
55
  def wrapper(*args, **kwargs):
56
    for i in ITERATIONS:
57
      fn(*args, **kwargs)
58
  return wrapper
59

    
60

    
61
def SafeSleep(duration):
62
  start = time.time()
63
  while True:
64
    delay = start + duration - time.time()
65
    if delay <= 0.0:
66
      break
67
    time.sleep(delay)
68

    
69

    
70
class _ThreadedTestCase(unittest.TestCase):
71
  """Test class that supports adding/waiting on threads"""
72
  def setUp(self):
73
    unittest.TestCase.setUp(self)
74
    self.done = Queue.Queue(0)
75
    self.threads = []
76

    
77
  def _addThread(self, *args, **kwargs):
78
    """Create and remember a new thread"""
79
    t = threading.Thread(*args, **kwargs)
80
    self.threads.append(t)
81
    t.start()
82
    return t
83

    
84
  def _waitThreads(self):
85
    """Wait for all our threads to finish"""
86
    for t in self.threads:
87
      t.join(60)
88
      self.failIf(t.isAlive())
89
    self.threads = []
90

    
91

    
92
class _ConditionTestCase(_ThreadedTestCase):
93
  """Common test case for conditions"""
94

    
95
  def setUp(self, cls):
96
    _ThreadedTestCase.setUp(self)
97
    self.lock = threading.Lock()
98
    self.cond = cls(self.lock)
99

    
100
  def _testAcquireRelease(self):
101
    self.assertFalse(self.cond._is_owned())
102
    self.assertRaises(RuntimeError, self.cond.wait, None)
103
    self.assertRaises(RuntimeError, self.cond.notifyAll)
104

    
105
    self.cond.acquire()
106
    self.assert_(self.cond._is_owned())
107
    self.cond.notifyAll()
108
    self.assert_(self.cond._is_owned())
109
    self.cond.release()
110

    
111
    self.assertFalse(self.cond._is_owned())
112
    self.assertRaises(RuntimeError, self.cond.wait, None)
113
    self.assertRaises(RuntimeError, self.cond.notifyAll)
114

    
115
  def _testNotification(self):
116
    def _NotifyAll():
117
      self.done.put("NE")
118
      self.cond.acquire()
119
      self.done.put("NA")
120
      self.cond.notifyAll()
121
      self.done.put("NN")
122
      self.cond.release()
123

    
124
    self.cond.acquire()
125
    self._addThread(target=_NotifyAll)
126
    self.assertEqual(self.done.get(True, 1), "NE")
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.wait(None)
129
    self.assertEqual(self.done.get(True, 1), "NA")
130
    self.assertEqual(self.done.get(True, 1), "NN")
131
    self.assert_(self.cond._is_owned())
132
    self.cond.release()
133
    self.assertFalse(self.cond._is_owned())
134

    
135

    
136
class TestSingleNotifyPipeCondition(_ConditionTestCase):
137
  """SingleNotifyPipeCondition tests"""
138

    
139
  def setUp(self):
140
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141

    
142
  def testAcquireRelease(self):
143
    self._testAcquireRelease()
144

    
145
  def testNotification(self):
146
    self._testNotification()
147

    
148
  def testWaitReuse(self):
149
    self.cond.acquire()
150
    self.cond.wait(0)
151
    self.cond.wait(0.1)
152
    self.cond.release()
153

    
154
  def testNoNotifyReuse(self):
155
    self.cond.acquire()
156
    self.cond.notifyAll()
157
    self.assertRaises(RuntimeError, self.cond.wait, None)
158
    self.assertRaises(RuntimeError, self.cond.notifyAll)
159
    self.cond.release()
160

    
161

    
162
class TestPipeCondition(_ConditionTestCase):
163
  """PipeCondition tests"""
164

    
165
  def setUp(self):
166
    _ConditionTestCase.setUp(self, locking.PipeCondition)
167

    
168
  def testAcquireRelease(self):
169
    self._testAcquireRelease()
170

    
171
  def testNotification(self):
172
    self._testNotification()
173

    
174
  def _TestWait(self, fn):
175
    threads = [
176
      self._addThread(target=fn),
177
      self._addThread(target=fn),
178
      self._addThread(target=fn),
179
      ]
180

    
181
    # Wait for threads to be waiting
182
    for _ in threads:
183
      self.assertEqual(self.done.get(True, 1), "A")
184

    
185
    self.assertRaises(Queue.Empty, self.done.get_nowait)
186

    
187
    self.cond.acquire()
188
    self.assertEqual(len(self.cond._waiters), 3)
189
    self.assertEqual(self.cond._waiters, set(threads))
190

    
191
    self.assertTrue(repr(self.cond).startswith("<"))
192
    self.assertTrue("waiters=" in repr(self.cond))
193

    
194
    # This new thread can't acquire the lock, and thus call wait, before we
195
    # release it
196
    self._addThread(target=fn)
197
    self.cond.notifyAll()
198
    self.assertRaises(Queue.Empty, self.done.get_nowait)
199
    self.cond.release()
200

    
201
    # We should now get 3 W and 1 A (for the new thread) in whatever order
202
    w = 0
203
    a = 0
204
    for i in range(4):
205
      got = self.done.get(True, 1)
206
      if got == "W":
207
        w += 1
208
      elif got == "A":
209
        a += 1
210
      else:
211
        self.fail("Got %s on the done queue" % got)
212

    
213
    self.assertEqual(w, 3)
214
    self.assertEqual(a, 1)
215

    
216
    self.cond.acquire()
217
    self.cond.notifyAll()
218
    self.cond.release()
219
    self._waitThreads()
220
    self.assertEqual(self.done.get_nowait(), "W")
221
    self.assertRaises(Queue.Empty, self.done.get_nowait)
222

    
223
  def testBlockingWait(self):
224
    def _BlockingWait():
225
      self.cond.acquire()
226
      self.done.put("A")
227
      self.cond.wait(None)
228
      self.cond.release()
229
      self.done.put("W")
230

    
231
    self._TestWait(_BlockingWait)
232

    
233
  def testLongTimeoutWait(self):
234
    def _Helper():
235
      self.cond.acquire()
236
      self.done.put("A")
237
      self.cond.wait(15.0)
238
      self.cond.release()
239
      self.done.put("W")
240

    
241
    self._TestWait(_Helper)
242

    
243
  def _TimeoutWait(self, timeout, check):
244
    self.cond.acquire()
245
    self.cond.wait(timeout)
246
    self.cond.release()
247
    self.done.put(check)
248

    
249
  def testShortTimeoutWait(self):
250
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
251
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
252
    self._waitThreads()
253
    self.assertEqual(self.done.get_nowait(), "T1")
254
    self.assertEqual(self.done.get_nowait(), "T1")
255
    self.assertRaises(Queue.Empty, self.done.get_nowait)
256

    
257
  def testZeroTimeoutWait(self):
258
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
259
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
260
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
261
    self._waitThreads()
262
    self.assertEqual(self.done.get_nowait(), "T0")
263
    self.assertEqual(self.done.get_nowait(), "T0")
264
    self.assertEqual(self.done.get_nowait(), "T0")
265
    self.assertRaises(Queue.Empty, self.done.get_nowait)
266

    
267

    
268
class TestSharedLock(_ThreadedTestCase):
269
  """SharedLock tests"""
270

    
271
  def setUp(self):
272
    _ThreadedTestCase.setUp(self)
273
    self.sl = locking.SharedLock("TestSharedLock")
274

    
275
    self.assertTrue(repr(self.sl).startswith("<"))
276
    self.assertTrue("name=TestSharedLock" in repr(self.sl))
277

    
278
  def testSequenceAndOwnership(self):
279
    self.assertFalse(self.sl.is_owned())
280
    self.sl.acquire(shared=1)
281
    self.assert_(self.sl.is_owned())
282
    self.assert_(self.sl.is_owned(shared=1))
283
    self.assertFalse(self.sl.is_owned(shared=0))
284
    self.sl.release()
285
    self.assertFalse(self.sl.is_owned())
286
    self.sl.acquire()
287
    self.assert_(self.sl.is_owned())
288
    self.assertFalse(self.sl.is_owned(shared=1))
289
    self.assert_(self.sl.is_owned(shared=0))
290
    self.sl.release()
291
    self.assertFalse(self.sl.is_owned())
292
    self.sl.acquire(shared=1)
293
    self.assert_(self.sl.is_owned())
294
    self.assert_(self.sl.is_owned(shared=1))
295
    self.assertFalse(self.sl.is_owned(shared=0))
296
    self.sl.release()
297
    self.assertFalse(self.sl.is_owned())
298

    
299
  def testBooleanValue(self):
300
    # semaphores are supposed to return a true value on a successful acquire
301
    self.assert_(self.sl.acquire(shared=1))
302
    self.sl.release()
303
    self.assert_(self.sl.acquire())
304
    self.sl.release()
305

    
306
  def testDoubleLockingStoE(self):
307
    self.sl.acquire(shared=1)
308
    self.assertRaises(AssertionError, self.sl.acquire)
309

    
310
  def testDoubleLockingEtoS(self):
311
    self.sl.acquire()
312
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
313

    
314
  def testDoubleLockingStoS(self):
315
    self.sl.acquire(shared=1)
316
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
317

    
318
  def testDoubleLockingEtoE(self):
319
    self.sl.acquire()
320
    self.assertRaises(AssertionError, self.sl.acquire)
321

    
322
  # helper functions: called in a separate thread they acquire the lock, send
323
  # their identifier on the done queue, then release it.
324
  def _doItSharer(self):
325
    try:
326
      self.sl.acquire(shared=1)
327
      self.done.put("SHR")
328
      self.sl.release()
329
    except errors.LockError:
330
      self.done.put("ERR")
331

    
332
  def _doItExclusive(self):
333
    try:
334
      self.sl.acquire()
335
      self.done.put("EXC")
336
      self.sl.release()
337
    except errors.LockError:
338
      self.done.put("ERR")
339

    
340
  def _doItDelete(self):
341
    try:
342
      self.sl.delete()
343
      self.done.put("DEL")
344
    except errors.LockError:
345
      self.done.put("ERR")
346

    
347
  def testSharersCanCoexist(self):
348
    self.sl.acquire(shared=1)
349
    threading.Thread(target=self._doItSharer).start()
350
    self.assert_(self.done.get(True, 1))
351
    self.sl.release()
352

    
353
  @_Repeat
354
  def testExclusiveBlocksExclusive(self):
355
    self.sl.acquire()
356
    self._addThread(target=self._doItExclusive)
357
    self.assertRaises(Queue.Empty, self.done.get_nowait)
358
    self.sl.release()
359
    self._waitThreads()
360
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
361

    
362
  @_Repeat
363
  def testExclusiveBlocksDelete(self):
364
    self.sl.acquire()
365
    self._addThread(target=self._doItDelete)
366
    self.assertRaises(Queue.Empty, self.done.get_nowait)
367
    self.sl.release()
368
    self._waitThreads()
369
    self.failUnlessEqual(self.done.get_nowait(), "DEL")
370
    self.sl = locking.SharedLock(self.sl.name)
371

    
372
  @_Repeat
373
  def testExclusiveBlocksSharer(self):
374
    self.sl.acquire()
375
    self._addThread(target=self._doItSharer)
376
    self.assertRaises(Queue.Empty, self.done.get_nowait)
377
    self.sl.release()
378
    self._waitThreads()
379
    self.failUnlessEqual(self.done.get_nowait(), "SHR")
380

    
381
  @_Repeat
382
  def testSharerBlocksExclusive(self):
383
    self.sl.acquire(shared=1)
384
    self._addThread(target=self._doItExclusive)
385
    self.assertRaises(Queue.Empty, self.done.get_nowait)
386
    self.sl.release()
387
    self._waitThreads()
388
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
389

    
390
  @_Repeat
391
  def testSharerBlocksDelete(self):
392
    self.sl.acquire(shared=1)
393
    self._addThread(target=self._doItDelete)
394
    self.assertRaises(Queue.Empty, self.done.get_nowait)
395
    self.sl.release()
396
    self._waitThreads()
397
    self.failUnlessEqual(self.done.get_nowait(), "DEL")
398
    self.sl = locking.SharedLock(self.sl.name)
399

    
400
  @_Repeat
401
  def testWaitingExclusiveBlocksSharer(self):
402
    """SKIPPED testWaitingExclusiveBlockSharer"""
403
    return
404

    
405
    self.sl.acquire(shared=1)
406
    # the lock is acquired in shared mode...
407
    self._addThread(target=self._doItExclusive)
408
    # ...but now an exclusive is waiting...
409
    self._addThread(target=self._doItSharer)
410
    # ...so the sharer should be blocked as well
411
    self.assertRaises(Queue.Empty, self.done.get_nowait)
412
    self.sl.release()
413
    self._waitThreads()
414
    # The exclusive passed before
415
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
416
    self.failUnlessEqual(self.done.get_nowait(), "SHR")
417

    
418
  @_Repeat
419
  def testWaitingSharerBlocksExclusive(self):
420
    """SKIPPED testWaitingSharerBlocksExclusive"""
421
    return
422

    
423
    self.sl.acquire()
424
    # the lock is acquired in exclusive mode...
425
    self._addThread(target=self._doItSharer)
426
    # ...but now a sharer is waiting...
427
    self._addThread(target=self._doItExclusive)
428
    # ...the exclusive is waiting too...
429
    self.assertRaises(Queue.Empty, self.done.get_nowait)
430
    self.sl.release()
431
    self._waitThreads()
432
    # The sharer passed before
433
    self.assertEqual(self.done.get_nowait(), "SHR")
434
    self.assertEqual(self.done.get_nowait(), "EXC")
435

    
436
  def testDelete(self):
437
    self.sl.delete()
438
    self.assertRaises(errors.LockError, self.sl.acquire)
439
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
440
    self.assertRaises(errors.LockError, self.sl.delete)
441

    
442
  def testDeleteTimeout(self):
443
    self.assertTrue(self.sl.delete(timeout=60))
444

    
445
  def testDeleteTimeoutFail(self):
446
    ready = threading.Event()
447
    finish = threading.Event()
448

    
449
    def fn():
450
      self.sl.acquire(shared=0)
451
      ready.set()
452

    
453
      finish.wait()
454
      self.sl.release()
455

    
456
    self._addThread(target=fn)
457
    ready.wait()
458

    
459
    # Test if deleting a lock owned in exclusive mode by another thread fails
460
    # to delete when a timeout is used
461
    self.assertFalse(self.sl.delete(timeout=0.02))
462

    
463
    finish.set()
464
    self._waitThreads()
465

    
466
    self.assertTrue(self.sl.delete())
467
    self.assertRaises(errors.LockError, self.sl.acquire)
468

    
469
  def testNoDeleteIfSharer(self):
470
    self.sl.acquire(shared=1)
471
    self.assertRaises(AssertionError, self.sl.delete)
472

    
473
  @_Repeat
474
  def testDeletePendingSharersExclusiveDelete(self):
475
    self.sl.acquire()
476
    self._addThread(target=self._doItSharer)
477
    self._addThread(target=self._doItSharer)
478
    self._addThread(target=self._doItExclusive)
479
    self._addThread(target=self._doItDelete)
480
    self.sl.delete()
481
    self._waitThreads()
482
    # The threads who were pending return ERR
483
    for _ in range(4):
484
      self.assertEqual(self.done.get_nowait(), "ERR")
485
    self.sl = locking.SharedLock(self.sl.name)
486

    
487
  @_Repeat
488
  def testDeletePendingDeleteExclusiveSharers(self):
489
    self.sl.acquire()
490
    self._addThread(target=self._doItDelete)
491
    self._addThread(target=self._doItExclusive)
492
    self._addThread(target=self._doItSharer)
493
    self._addThread(target=self._doItSharer)
494
    self.sl.delete()
495
    self._waitThreads()
496
    # The two threads who were pending return both ERR
497
    self.assertEqual(self.done.get_nowait(), "ERR")
498
    self.assertEqual(self.done.get_nowait(), "ERR")
499
    self.assertEqual(self.done.get_nowait(), "ERR")
500
    self.assertEqual(self.done.get_nowait(), "ERR")
501
    self.sl = locking.SharedLock(self.sl.name)
502

    
503
  @_Repeat
504
  def testExclusiveAcquireTimeout(self):
505
    for shared in [0, 1]:
506
      on_queue = threading.Event()
507
      release_exclusive = threading.Event()
508

    
509
      def _LockExclusive():
510
        self.sl.acquire(shared=0, test_notify=on_queue.set)
511
        self.done.put("A: start wait")
512
        release_exclusive.wait()
513
        self.done.put("A: end wait")
514
        self.sl.release()
515

    
516
      # Start thread to hold lock in exclusive mode
517
      self._addThread(target=_LockExclusive)
518

    
519
      # Wait for wait to begin
520
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
521

    
522
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
523
      # on the queue
524
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
525
                                      test_notify=release_exclusive.set))
526

    
527
      self.done.put("got 2nd")
528
      self.sl.release()
529

    
530
      self._waitThreads()
531

    
532
      self.assertEqual(self.done.get_nowait(), "A: end wait")
533
      self.assertEqual(self.done.get_nowait(), "got 2nd")
534
      self.assertRaises(Queue.Empty, self.done.get_nowait)
535

    
536
  @_Repeat
537
  def testAcquireExpiringTimeout(self):
538
    def _AcquireWithTimeout(shared, timeout):
539
      if not self.sl.acquire(shared=shared, timeout=timeout):
540
        self.done.put("timeout")
541

    
542
    for shared in [0, 1]:
543
      # Lock exclusively
544
      self.sl.acquire()
545

    
546
      # Start shared acquires with timeout between 0 and 20 ms
547
      for i in range(11):
548
        self._addThread(target=_AcquireWithTimeout,
549
                        args=(shared, i * 2.0 / 1000.0))
550

    
551
      # Wait for threads to finish (makes sure the acquire timeout expires
552
      # before releasing the lock)
553
      self._waitThreads()
554

    
555
      # Release lock
556
      self.sl.release()
557

    
558
      for _ in range(11):
559
        self.assertEqual(self.done.get_nowait(), "timeout")
560

    
561
      self.assertRaises(Queue.Empty, self.done.get_nowait)
562

    
563
  @_Repeat
564
  def testSharedSkipExclusiveAcquires(self):
565
    # Tests whether shared acquires jump in front of exclusive acquires in the
566
    # queue.
567

    
568
    def _Acquire(shared, name, notify_ev, wait_ev):
569
      if notify_ev:
570
        notify_fn = notify_ev.set
571
      else:
572
        notify_fn = None
573

    
574
      if wait_ev:
575
        wait_ev.wait()
576

    
577
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
578
        return
579

    
580
      self.done.put(name)
581
      self.sl.release()
582

    
583
    # Get exclusive lock while we fill the queue
584
    self.sl.acquire()
585

    
586
    shrcnt1 = 5
587
    shrcnt2 = 7
588
    shrcnt3 = 9
589
    shrcnt4 = 2
590

    
591
    # Add acquires using threading.Event for synchronization. They'll be
592
    # acquired exactly in the order defined in this list.
593
    acquires = (shrcnt1 * [(1, "shared 1")] +
594
                3 * [(0, "exclusive 1")] +
595
                shrcnt2 * [(1, "shared 2")] +
596
                shrcnt3 * [(1, "shared 3")] +
597
                shrcnt4 * [(1, "shared 4")] +
598
                3 * [(0, "exclusive 2")])
599

    
600
    ev_cur = None
601
    ev_prev = None
602

    
603
    for args in acquires:
604
      ev_cur = threading.Event()
605
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
606
      ev_prev = ev_cur
607

    
608
    # Wait for last acquire to start
609
    ev_prev.wait()
610

    
611
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
612
    # together
613
    self.assertEqual(self.sl._count_pending(), 7)
614

    
615
    # Release exclusive lock and wait
616
    self.sl.release()
617

    
618
    self._waitThreads()
619

    
620
    # Check sequence
621
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
622
      # Shared locks aren't guaranteed to be notified in order, but they'll be
623
      # first
624
      tmp = self.done.get_nowait()
625
      if tmp == "shared 1":
626
        shrcnt1 -= 1
627
      elif tmp == "shared 2":
628
        shrcnt2 -= 1
629
      elif tmp == "shared 3":
630
        shrcnt3 -= 1
631
      elif tmp == "shared 4":
632
        shrcnt4 -= 1
633
    self.assertEqual(shrcnt1, 0)
634
    self.assertEqual(shrcnt2, 0)
635
    self.assertEqual(shrcnt3, 0)
636
    self.assertEqual(shrcnt3, 0)
637

    
638
    for _ in range(3):
639
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
640

    
641
    for _ in range(3):
642
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
643

    
644
    self.assertRaises(Queue.Empty, self.done.get_nowait)
645

    
646
  def testIllegalDowngrade(self):
647
    # Not yet acquired
648
    self.assertRaises(AssertionError, self.sl.downgrade)
649

    
650
    # Acquire in shared mode, downgrade should be no-op
651
    self.assertTrue(self.sl.acquire(shared=1))
652
    self.assertTrue(self.sl.is_owned(shared=1))
653
    self.assertTrue(self.sl.downgrade())
654
    self.assertTrue(self.sl.is_owned(shared=1))
655
    self.sl.release()
656

    
657
  def testDowngrade(self):
658
    self.assertTrue(self.sl.acquire())
659
    self.assertTrue(self.sl.is_owned(shared=0))
660
    self.assertTrue(self.sl.downgrade())
661
    self.assertTrue(self.sl.is_owned(shared=1))
662
    self.sl.release()
663

    
664
  @_Repeat
665
  def testDowngradeJumpsAheadOfExclusive(self):
666
    def _KeepExclusive(ev_got, ev_downgrade, ev_release):
667
      self.assertTrue(self.sl.acquire())
668
      self.assertTrue(self.sl.is_owned(shared=0))
669
      ev_got.set()
670
      ev_downgrade.wait()
671
      self.assertTrue(self.sl.is_owned(shared=0))
672
      self.assertTrue(self.sl.downgrade())
673
      self.assertTrue(self.sl.is_owned(shared=1))
674
      ev_release.wait()
675
      self.assertTrue(self.sl.is_owned(shared=1))
676
      self.sl.release()
677

    
678
    def _KeepExclusive2(ev_started, ev_release):
679
      self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
680
      self.assertTrue(self.sl.is_owned(shared=0))
681
      ev_release.wait()
682
      self.assertTrue(self.sl.is_owned(shared=0))
683
      self.sl.release()
684

    
685
    def _KeepShared(ev_started, ev_got, ev_release):
686
      self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
687
      self.assertTrue(self.sl.is_owned(shared=1))
688
      ev_got.set()
689
      ev_release.wait()
690
      self.assertTrue(self.sl.is_owned(shared=1))
691
      self.sl.release()
692

    
693
    # Acquire lock in exclusive mode
694
    ev_got_excl1 = threading.Event()
695
    ev_downgrade_excl1 = threading.Event()
696
    ev_release_excl1 = threading.Event()
697
    th_excl1 = self._addThread(target=_KeepExclusive,
698
                               args=(ev_got_excl1, ev_downgrade_excl1,
699
                                     ev_release_excl1))
700
    ev_got_excl1.wait()
701

    
702
    # Start a second exclusive acquire
703
    ev_started_excl2 = threading.Event()
704
    ev_release_excl2 = threading.Event()
705
    th_excl2 = self._addThread(target=_KeepExclusive2,
706
                               args=(ev_started_excl2, ev_release_excl2))
707
    ev_started_excl2.wait()
708

    
709
    # Start shared acquires, will jump ahead of second exclusive acquire when
710
    # first exclusive acquire downgrades
711
    ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
712
    ev_release_shared = threading.Event()
713

    
714
    th_shared = [self._addThread(target=_KeepShared,
715
                                 args=(ev_started, ev_got, ev_release_shared))
716
                 for (ev_started, ev_got) in ev_shared]
717

    
718
    # Wait for all shared acquires to start
719
    for (ev, _) in ev_shared:
720
      ev.wait()
721

    
722
    # Check lock information
723
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
724
                     [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
725
    [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
726
    self.assertEqual([(pendmode, sorted(waiting))
727
                      for (pendmode, waiting) in pending],
728
                     [("exclusive", [th_excl2.getName()]),
729
                      ("shared", sorted(th.getName() for th in th_shared))])
730

    
731
    # Shared acquires won't start until the exclusive lock is downgraded
732
    ev_downgrade_excl1.set()
733

    
734
    # Wait for all shared acquires to be successful
735
    for (_, ev) in ev_shared:
736
      ev.wait()
737

    
738
    # Check lock information again
739
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
740
                                              query.LQ_PENDING])),
741
                     [(self.sl.name, "shared", None,
742
                       [("exclusive", [th_excl2.getName()])])])
743
    [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
744
    self.assertEqual(set(owner), set([th_excl1.getName()] +
745
                                     [th.getName() for th in th_shared]))
746

    
747
    ev_release_excl1.set()
748
    ev_release_excl2.set()
749
    ev_release_shared.set()
750

    
751
    self._waitThreads()
752

    
753
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
754
                                              query.LQ_PENDING])),
755
                     [(self.sl.name, None, None, [])])
756

    
757
  @_Repeat
758
  def testMixedAcquireTimeout(self):
759
    sync = threading.Event()
760

    
761
    def _AcquireShared(ev):
762
      if not self.sl.acquire(shared=1, timeout=None):
763
        return
764

    
765
      self.done.put("shared")
766

    
767
      # Notify main thread
768
      ev.set()
769

    
770
      # Wait for notification from main thread
771
      sync.wait()
772

    
773
      # Release lock
774
      self.sl.release()
775

    
776
    acquires = []
777
    for _ in range(3):
778
      ev = threading.Event()
779
      self._addThread(target=_AcquireShared, args=(ev, ))
780
      acquires.append(ev)
781

    
782
    # Wait for all acquires to finish
783
    for i in acquires:
784
      i.wait()
785

    
786
    self.assertEqual(self.sl._count_pending(), 0)
787

    
788
    # Try to get exclusive lock
789
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
790

    
791
    # Acquire exclusive without timeout
792
    exclsync = threading.Event()
793
    exclev = threading.Event()
794

    
795
    def _AcquireExclusive():
796
      if not self.sl.acquire(shared=0):
797
        return
798

    
799
      self.done.put("exclusive")
800

    
801
      # Notify main thread
802
      exclev.set()
803

    
804
      # Wait for notification from main thread
805
      exclsync.wait()
806

    
807
      self.sl.release()
808

    
809
    self._addThread(target=_AcquireExclusive)
810

    
811
    # Try to get exclusive lock
812
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
813

    
814
    # Make all shared holders release their locks
815
    sync.set()
816

    
817
    # Wait for exclusive acquire to succeed
818
    exclev.wait()
819

    
820
    self.assertEqual(self.sl._count_pending(), 0)
821

    
822
    # Try to get exclusive lock
823
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
824

    
825
    def _AcquireSharedSimple():
826
      if self.sl.acquire(shared=1, timeout=None):
827
        self.done.put("shared2")
828
        self.sl.release()
829

    
830
    for _ in range(10):
831
      self._addThread(target=_AcquireSharedSimple)
832

    
833
    # Tell exclusive lock to release
834
    exclsync.set()
835

    
836
    # Wait for everything to finish
837
    self._waitThreads()
838

    
839
    self.assertEqual(self.sl._count_pending(), 0)
840

    
841
    # Check sequence
842
    for _ in range(3):
843
      self.assertEqual(self.done.get_nowait(), "shared")
844

    
845
    self.assertEqual(self.done.get_nowait(), "exclusive")
846

    
847
    for _ in range(10):
848
      self.assertEqual(self.done.get_nowait(), "shared2")
849

    
850
    self.assertRaises(Queue.Empty, self.done.get_nowait)
851

    
852
  def testPriority(self):
853
    # Acquire in exclusive mode
854
    self.assert_(self.sl.acquire(shared=0))
855

    
856
    # Queue acquires
857
    def _Acquire(prev, next, shared, priority, result):
858
      prev.wait()
859
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
860
      try:
861
        self.done.put(result)
862
      finally:
863
        self.sl.release()
864

    
865
    counter = itertools.count(0)
866
    priorities = range(-20, 30)
867
    first = threading.Event()
868
    prev = first
869

    
870
    # Data structure:
871
    # {
872
    #   priority:
873
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
874
    #      (shared/exclusive, ...),
875
    #      ...,
876
    #     ],
877
    # }
878
    perprio = {}
879

    
880
    # References shared acquire per priority in L{perprio}. Data structure:
881
    # {
882
    #   priority: (shared=1, set(acquire names), set(pending threads)),
883
    # }
884
    prioshared = {}
885

    
886
    for seed in [4979, 9523, 14902, 32440]:
887
      # Use a deterministic random generator
888
      rnd = random.Random(seed)
889
      for priority in [rnd.choice(priorities) for _ in range(30)]:
890
        modes = [0, 1]
891
        rnd.shuffle(modes)
892
        for shared in modes:
893
          # Unique name
894
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
895

    
896
          ev = threading.Event()
897
          thread = self._addThread(target=_Acquire,
898
                                   args=(prev, ev, shared, priority, acqname))
899
          prev = ev
900

    
901
          # Record expected aqcuire, see above for structure
902
          data = (shared, set([acqname]), set([thread]))
903
          priolist = perprio.setdefault(priority, [])
904
          if shared:
905
            priosh = prioshared.get(priority, None)
906
            if priosh:
907
              # Shared acquires are merged
908
              for i, j in zip(priosh[1:], data[1:]):
909
                i.update(j)
910
              assert data[0] == priosh[0]
911
            else:
912
              prioshared[priority] = data
913
              priolist.append(data)
914
          else:
915
            priolist.append(data)
916

    
917
    # Start all acquires and wait for them
918
    first.set()
919
    prev.wait()
920

    
921
    # Check lock information
922
    self.assertEqual(self.sl.GetLockInfo(set()),
923
                     [(self.sl.name, None, None, None)])
924
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
925
                     [(self.sl.name, "exclusive",
926
                       [threading.currentThread().getName()], None)])
927

    
928
    self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
929
                            perprio)
930

    
931
    # Let threads acquire the lock
932
    self.sl.release()
933

    
934
    # Wait for everything to finish
935
    self._waitThreads()
936

    
937
    self.assert_(self.sl._check_empty())
938

    
939
    # Check acquires by priority
940
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
941
      for (_, names, _) in acquires:
942
        # For shared acquires, the set will contain 1..n entries. For exclusive
943
        # acquires only one.
944
        while names:
945
          names.remove(self.done.get_nowait())
946
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
947

    
948
    self.assertRaises(Queue.Empty, self.done.get_nowait)
949

    
950
  def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
951
    self.assertEqual(name, self.sl.name)
952
    self.assert_(mode is None)
953
    self.assert_(owner is None)
954

    
955
    self.assertEqual([(pendmode, sorted(waiting))
956
                      for (pendmode, waiting) in pending],
957
                     [(["exclusive", "shared"][int(bool(shared))],
958
                       sorted(t.getName() for t in threads))
959
                      for acquires in [perprio[i]
960
                                       for i in sorted(perprio.keys())]
961
                      for (shared, _, threads) in acquires])
962

    
963
  class _FakeTimeForSpuriousNotifications:
964
    def __init__(self, now, check_end):
965
      self.now = now
966
      self.check_end = check_end
967

    
968
      # Deterministic random number generator
969
      self.rnd = random.Random(15086)
970

    
971
    def time(self):
972
      # Advance time if the random number generator thinks so (this is to test
973
      # multiple notifications without advancing the time)
974
      if self.rnd.random() < 0.3:
975
        self.now += self.rnd.random()
976

    
977
      self.check_end(self.now)
978

    
979
      return self.now
980

    
981
  @_Repeat
982
  def testAcquireTimeoutWithSpuriousNotifications(self):
983
    ready = threading.Event()
984
    locked = threading.Event()
985
    req = Queue.Queue(0)
986

    
987
    epoch = 4000.0
988
    timeout = 60.0
989

    
990
    def check_end(now):
991
      self.assertFalse(locked.isSet())
992

    
993
      # If we waited long enough (in virtual time), tell main thread to release
994
      # lock, otherwise tell it to notify once more
995
      req.put(now < (epoch + (timeout * 0.8)))
996

    
997
    time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
998

    
999
    sl = locking.SharedLock("test", _time_fn=time_fn)
1000

    
1001
    # Acquire in exclusive mode
1002
    sl.acquire(shared=0)
1003

    
1004
    def fn():
1005
      self.assertTrue(sl.acquire(shared=0, timeout=timeout,
1006
                                 test_notify=ready.set))
1007
      locked.set()
1008
      sl.release()
1009
      self.done.put("success")
1010

    
1011
    # Start acquire with timeout and wait for it to be ready
1012
    self._addThread(target=fn)
1013
    ready.wait()
1014

    
1015
    # The separate thread is now waiting to acquire the lock, so start sending
1016
    # spurious notifications.
1017

    
1018
    # Wait for separate thread to ask for another notification
1019
    count = 0
1020
    while req.get():
1021
      # After sending the notification, the lock will take a short amount of
1022
      # time to notice and to retrieve the current time
1023
      sl._notify_topmost()
1024
      count += 1
1025

    
1026
    self.assertTrue(count > 100, "Not enough notifications were sent")
1027

    
1028
    self.assertFalse(locked.isSet())
1029

    
1030
    # Some notifications have been sent, now actually release the lock
1031
    sl.release()
1032

    
1033
    # Wait for lock to be acquired
1034
    locked.wait()
1035

    
1036
    self._waitThreads()
1037

    
1038
    self.assertEqual(self.done.get_nowait(), "success")
1039
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1040

    
1041

    
1042
class TestSharedLockInCondition(_ThreadedTestCase):
1043
  """SharedLock as a condition lock tests"""
1044

    
1045
  def setUp(self):
1046
    _ThreadedTestCase.setUp(self)
1047
    self.sl = locking.SharedLock("TestSharedLockInCondition")
1048
    self.setCondition()
1049

    
1050
  def setCondition(self):
1051
    self.cond = threading.Condition(self.sl)
1052

    
1053
  def testKeepMode(self):
1054
    self.cond.acquire(shared=1)
1055
    self.assert_(self.sl.is_owned(shared=1))
1056
    self.cond.wait(0)
1057
    self.assert_(self.sl.is_owned(shared=1))
1058
    self.cond.release()
1059
    self.cond.acquire(shared=0)
1060
    self.assert_(self.sl.is_owned(shared=0))
1061
    self.cond.wait(0)
1062
    self.assert_(self.sl.is_owned(shared=0))
1063
    self.cond.release()
1064

    
1065

    
1066
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
1067
  """SharedLock as a pipe condition lock tests"""
1068

    
1069
  def setCondition(self):
1070
    self.cond = locking.PipeCondition(self.sl)
1071

    
1072

    
1073
class TestSSynchronizedDecorator(_ThreadedTestCase):
1074
  """Shared Lock Synchronized decorator test"""
1075

    
1076
  def setUp(self):
1077
    _ThreadedTestCase.setUp(self)
1078

    
1079
  @locking.ssynchronized(_decoratorlock)
1080
  def _doItExclusive(self):
1081
    self.assert_(_decoratorlock.is_owned())
1082
    self.done.put("EXC")
1083

    
1084
  @locking.ssynchronized(_decoratorlock, shared=1)
1085
  def _doItSharer(self):
1086
    self.assert_(_decoratorlock.is_owned(shared=1))
1087
    self.done.put("SHR")
1088

    
1089
  def testDecoratedFunctions(self):
1090
    self._doItExclusive()
1091
    self.assertFalse(_decoratorlock.is_owned())
1092
    self._doItSharer()
1093
    self.assertFalse(_decoratorlock.is_owned())
1094

    
1095
  def testSharersCanCoexist(self):
1096
    _decoratorlock.acquire(shared=1)
1097
    threading.Thread(target=self._doItSharer).start()
1098
    self.assert_(self.done.get(True, 1))
1099
    _decoratorlock.release()
1100

    
1101
  @_Repeat
1102
  def testExclusiveBlocksExclusive(self):
1103
    _decoratorlock.acquire()
1104
    self._addThread(target=self._doItExclusive)
1105
    # give it a bit of time to check that it's not actually doing anything
1106
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1107
    _decoratorlock.release()
1108
    self._waitThreads()
1109
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
1110

    
1111
  @_Repeat
1112
  def testExclusiveBlocksSharer(self):
1113
    _decoratorlock.acquire()
1114
    self._addThread(target=self._doItSharer)
1115
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1116
    _decoratorlock.release()
1117
    self._waitThreads()
1118
    self.failUnlessEqual(self.done.get_nowait(), "SHR")
1119

    
1120
  @_Repeat
1121
  def testSharerBlocksExclusive(self):
1122
    _decoratorlock.acquire(shared=1)
1123
    self._addThread(target=self._doItExclusive)
1124
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1125
    _decoratorlock.release()
1126
    self._waitThreads()
1127
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
1128

    
1129

    
1130
class TestLockSet(_ThreadedTestCase):
1131
  """LockSet tests"""
1132

    
1133
  def setUp(self):
1134
    _ThreadedTestCase.setUp(self)
1135
    self._setUpLS()
1136

    
1137
  def _setUpLS(self):
1138
    """Helper to (re)initialize the lock set"""
1139
    self.resources = ["one", "two", "three"]
1140
    self.ls = locking.LockSet(self.resources, "TestLockSet")
1141

    
1142
  def testResources(self):
1143
    self.assertEquals(self.ls._names(), set(self.resources))
1144
    newls = locking.LockSet([], "TestLockSet.testResources")
1145
    self.assertEquals(newls._names(), set())
1146

    
1147
  def testCheckOwnedUnknown(self):
1148
    self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1149
    for shared in [-1, 0, 1, 6378, 24255]:
1150
      self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1151
                                           shared=shared))
1152

    
1153
  def testCheckOwnedUnknownWhileHolding(self):
1154
    self.assertFalse(self.ls.check_owned([]))
1155
    self.ls.acquire("one", shared=1)
1156
    self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1157
    self.assertTrue(self.ls.check_owned("one", shared=1))
1158
    self.assertFalse(self.ls.check_owned("one", shared=0))
1159
    self.assertFalse(self.ls.check_owned(["one", "two"]))
1160
    self.assertRaises(errors.LockError, self.ls.check_owned,
1161
                      ["one", "nonexist"])
1162
    self.assertRaises(errors.LockError, self.ls.check_owned, "")
1163
    self.ls.release()
1164
    self.assertFalse(self.ls.check_owned([]))
1165
    self.assertFalse(self.ls.check_owned("one"))
1166

    
1167
  def testAcquireRelease(self):
1168
    self.assertFalse(self.ls.check_owned(self.ls._names()))
1169
    self.assert_(self.ls.acquire("one"))
1170
    self.assertEquals(self.ls.list_owned(), set(["one"]))
1171
    self.assertTrue(self.ls.check_owned("one"))
1172
    self.assertTrue(self.ls.check_owned("one", shared=0))
1173
    self.assertFalse(self.ls.check_owned("one", shared=1))
1174
    self.ls.release()
1175
    self.assertEquals(self.ls.list_owned(), set())
1176
    self.assertFalse(self.ls.check_owned(self.ls._names()))
1177
    self.assertEquals(self.ls.acquire(["one"]), set(["one"]))
1178
    self.assertEquals(self.ls.list_owned(), set(["one"]))
1179
    self.ls.release()
1180
    self.assertEquals(self.ls.list_owned(), set())
1181
    self.ls.acquire(["one", "two", "three"])
1182
    self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1183
    self.assertTrue(self.ls.check_owned(self.ls._names()))
1184
    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1185
    self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1186
    self.ls.release("one")
1187
    self.assertFalse(self.ls.check_owned(["one"]))
1188
    self.assertTrue(self.ls.check_owned(["two", "three"]))
1189
    self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1190
    self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1191
    self.assertEquals(self.ls.list_owned(), set(["two", "three"]))
1192
    self.ls.release(["three"])
1193
    self.assertEquals(self.ls.list_owned(), set(["two"]))
1194
    self.ls.release()
1195
    self.assertEquals(self.ls.list_owned(), set())
1196
    self.assertEquals(self.ls.acquire(["one", "three"]), set(["one", "three"]))
1197
    self.assertEquals(self.ls.list_owned(), set(["one", "three"]))
1198
    self.ls.release()
1199
    self.assertEquals(self.ls.list_owned(), set())
1200
    for name in self.ls._names():
1201
      self.assertFalse(self.ls.check_owned(name))
1202

    
1203
  def testNoDoubleAcquire(self):
1204
    self.ls.acquire("one")
1205
    self.assertRaises(AssertionError, self.ls.acquire, "one")
1206
    self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1207
    self.assertRaises(AssertionError, self.ls.acquire, ["two", "three"])
1208
    self.ls.release()
1209
    self.ls.acquire(["one", "three"])
1210
    self.ls.release("one")
1211
    self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1212
    self.ls.release("three")
1213

    
1214
  def testNoWrongRelease(self):
1215
    self.assertRaises(AssertionError, self.ls.release)
1216
    self.ls.acquire("one")
1217
    self.assertRaises(AssertionError, self.ls.release, "two")
1218

    
1219
  def testAddRemove(self):
1220
    self.ls.add("four")
1221
    self.assertEquals(self.ls.list_owned(), set())
1222
    self.assert_("four" in self.ls._names())
1223
    self.ls.add(["five", "six", "seven"], acquired=1)
1224
    self.assert_("five" in self.ls._names())
1225
    self.assert_("six" in self.ls._names())
1226
    self.assert_("seven" in self.ls._names())
1227
    self.assertEquals(self.ls.list_owned(), set(["five", "six", "seven"]))
1228
    self.assertEquals(self.ls.remove(["five", "six"]), ["five", "six"])
1229
    self.assert_("five" not in self.ls._names())
1230
    self.assert_("six" not in self.ls._names())
1231
    self.assertEquals(self.ls.list_owned(), set(["seven"]))
1232
    self.assertRaises(AssertionError, self.ls.add, "eight", acquired=1)
1233
    self.ls.remove("seven")
1234
    self.assert_("seven" not in self.ls._names())
1235
    self.assertEquals(self.ls.list_owned(), set([]))
1236
    self.ls.acquire(None, shared=1)
1237
    self.assertRaises(AssertionError, self.ls.add, "eight")
1238
    self.ls.release()
1239
    self.ls.acquire(None)
1240
    self.ls.add("eight", acquired=1)
1241
    self.assert_("eight" in self.ls._names())
1242
    self.assert_("eight" in self.ls.list_owned())
1243
    self.ls.add("nine")
1244
    self.assert_("nine" in self.ls._names())
1245
    self.assert_("nine" not in self.ls.list_owned())
1246
    self.ls.release()
1247
    self.ls.remove(["two"])
1248
    self.assert_("two" not in self.ls._names())
1249
    self.ls.acquire("three")
1250
    self.assertEquals(self.ls.remove(["three"]), ["three"])
1251
    self.assert_("three" not in self.ls._names())
1252
    self.assertEquals(self.ls.remove("three"), [])
1253
    self.assertEquals(self.ls.remove(["one", "three", "six"]), ["one"])
1254
    self.assert_("one" not in self.ls._names())
1255

    
1256
  def testRemoveNonBlocking(self):
1257
    self.ls.acquire("one")
1258
    self.assertEquals(self.ls.remove("one"), ["one"])
1259
    self.ls.acquire(["two", "three"])
1260
    self.assertEquals(self.ls.remove(["two", "three"]),
1261
                      ["two", "three"])
1262

    
1263
  def testNoDoubleAdd(self):
1264
    self.assertRaises(errors.LockError, self.ls.add, "two")
1265
    self.ls.add("four")
1266
    self.assertRaises(errors.LockError, self.ls.add, "four")
1267

    
1268
  def testNoWrongRemoves(self):
1269
    self.ls.acquire(["one", "three"], shared=1)
1270
    # Cannot remove "two" while holding something which is not a superset
1271
    self.assertRaises(AssertionError, self.ls.remove, "two")
1272
    # Cannot remove "three" as we are sharing it
1273
    self.assertRaises(AssertionError, self.ls.remove, "three")
1274

    
1275
  def testAcquireSetLock(self):
1276
    # acquire the set-lock exclusively
1277
    self.assertEquals(self.ls.acquire(None), set(["one", "two", "three"]))
1278
    self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1279
    self.assertEquals(self.ls.is_owned(), True)
1280
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1281
    # I can still add/remove elements...
1282
    self.assertEquals(self.ls.remove(["two", "three"]), ["two", "three"])
1283
    self.assert_(self.ls.add("six"))
1284
    self.ls.release()
1285
    # share the set-lock
1286
    self.assertEquals(self.ls.acquire(None, shared=1), set(["one", "six"]))
1287
    # adding new elements is not possible
1288
    self.assertRaises(AssertionError, self.ls.add, "five")
1289
    self.ls.release()
1290

    
1291
  def testAcquireWithRepetitions(self):
1292
    self.assertEquals(self.ls.acquire(["two", "two", "three"], shared=1),
1293
                      set(["two", "two", "three"]))
1294
    self.ls.release(["two", "two"])
1295
    self.assertEquals(self.ls.list_owned(), set(["three"]))
1296

    
1297
  def testEmptyAcquire(self):
1298
    # Acquire an empty list of locks...
1299
    self.assertEquals(self.ls.acquire([]), set())
1300
    self.assertEquals(self.ls.list_owned(), set())
1301
    # New locks can still be addded
1302
    self.assert_(self.ls.add("six"))
1303
    # "re-acquiring" is not an issue, since we had really acquired nothing
1304
    self.assertEquals(self.ls.acquire([], shared=1), set())
1305
    self.assertEquals(self.ls.list_owned(), set())
1306
    # We haven't really acquired anything, so we cannot release
1307
    self.assertRaises(AssertionError, self.ls.release)
1308

    
1309
  def _doLockSet(self, names, shared):
1310
    try:
1311
      self.ls.acquire(names, shared=shared)
1312
      self.done.put("DONE")
1313
      self.ls.release()
1314
    except errors.LockError:
1315
      self.done.put("ERR")
1316

    
1317
  def _doAddSet(self, names):
1318
    try:
1319
      self.ls.add(names, acquired=1)
1320
      self.done.put("DONE")
1321
      self.ls.release()
1322
    except errors.LockError:
1323
      self.done.put("ERR")
1324

    
1325
  def _doRemoveSet(self, names):
1326
    self.done.put(self.ls.remove(names))
1327

    
1328
  @_Repeat
1329
  def testConcurrentSharedAcquire(self):
1330
    self.ls.acquire(["one", "two"], shared=1)
1331
    self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1332
    self._waitThreads()
1333
    self.assertEqual(self.done.get_nowait(), "DONE")
1334
    self._addThread(target=self._doLockSet, args=(["one", "two", "three"], 1))
1335
    self._waitThreads()
1336
    self.assertEqual(self.done.get_nowait(), "DONE")
1337
    self._addThread(target=self._doLockSet, args=("three", 1))
1338
    self._waitThreads()
1339
    self.assertEqual(self.done.get_nowait(), "DONE")
1340
    self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1341
    self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1342
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1343
    self.ls.release()
1344
    self._waitThreads()
1345
    self.assertEqual(self.done.get_nowait(), "DONE")
1346
    self.assertEqual(self.done.get_nowait(), "DONE")
1347

    
1348
  @_Repeat
1349
  def testConcurrentExclusiveAcquire(self):
1350
    self.ls.acquire(["one", "two"])
1351
    self._addThread(target=self._doLockSet, args=("three", 1))
1352
    self._waitThreads()
1353
    self.assertEqual(self.done.get_nowait(), "DONE")
1354
    self._addThread(target=self._doLockSet, args=("three", 0))
1355
    self._waitThreads()
1356
    self.assertEqual(self.done.get_nowait(), "DONE")
1357
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1358
    self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1359
    self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1360
    self._addThread(target=self._doLockSet, args=("one", 0))
1361
    self._addThread(target=self._doLockSet, args=("one", 1))
1362
    self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1363
    self._addThread(target=self._doLockSet, args=(["two", "three"], 1))
1364
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1365
    self.ls.release()
1366
    self._waitThreads()
1367
    for _ in range(6):
1368
      self.failUnlessEqual(self.done.get_nowait(), "DONE")
1369

    
1370
  @_Repeat
1371
  def testSimpleAcquireTimeoutExpiring(self):
1372
    names = sorted(self.ls._names())
1373
    self.assert_(len(names) >= 3)
1374

    
1375
    # Get name of first lock
1376
    first = names[0]
1377

    
1378
    # Get name of last lock
1379
    last = names.pop()
1380

    
1381
    checks = [
1382
      # Block first and try to lock it again
1383
      (first, first),
1384

    
1385
      # Block last and try to lock all locks
1386
      (None, first),
1387

    
1388
      # Block last and try to lock it again
1389
      (last, last),
1390
      ]
1391

    
1392
    for (wanted, block) in checks:
1393
      # Lock in exclusive mode
1394
      self.assert_(self.ls.acquire(block, shared=0))
1395

    
1396
      def _AcquireOne():
1397
        # Try to get the same lock again with a timeout (should never succeed)
1398
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1399
        if acquired:
1400
          self.done.put("acquired")
1401
          self.ls.release()
1402
        else:
1403
          self.assert_(acquired is None)
1404
          self.assertFalse(self.ls.list_owned())
1405
          self.assertFalse(self.ls.is_owned())
1406
          self.done.put("not acquired")
1407

    
1408
      self._addThread(target=_AcquireOne)
1409

    
1410
      # Wait for timeout in thread to expire
1411
      self._waitThreads()
1412

    
1413
      # Release exclusive lock again
1414
      self.ls.release()
1415

    
1416
      self.assertEqual(self.done.get_nowait(), "not acquired")
1417
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1418

    
1419
  @_Repeat
1420
  def testDelayedAndExpiringLockAcquire(self):
1421
    self._setUpLS()
1422
    self.ls.add(["five", "six", "seven", "eight", "nine"])
1423

    
1424
    for expire in (False, True):
1425
      names = sorted(self.ls._names())
1426
      self.assertEqual(len(names), 8)
1427

    
1428
      lock_ev = dict([(i, threading.Event()) for i in names])
1429

    
1430
      # Lock all in exclusive mode
1431
      self.assert_(self.ls.acquire(names, shared=0))
1432

    
1433
      if expire:
1434
        # We'll wait at least 300ms per lock
1435
        lockwait = len(names) * [0.3]
1436

    
1437
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1438
        # this gives us up to 2.4s to fail.
1439
        lockall_timeout = 0.4
1440
      else:
1441
        # This should finish rather quickly
1442
        lockwait = None
1443
        lockall_timeout = len(names) * 5.0
1444

    
1445
      def _LockAll():
1446
        def acquire_notification(name):
1447
          if not expire:
1448
            self.done.put("getting %s" % name)
1449

    
1450
          # Kick next lock
1451
          lock_ev[name].set()
1452

    
1453
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1454
                           test_notify=acquire_notification):
1455
          self.done.put("got all")
1456
          self.ls.release()
1457
        else:
1458
          self.done.put("timeout on all")
1459

    
1460
        # Notify all locks
1461
        for ev in lock_ev.values():
1462
          ev.set()
1463

    
1464
      t = self._addThread(target=_LockAll)
1465

    
1466
      for idx, name in enumerate(names):
1467
        # Wait for actual acquire on this lock to start
1468
        lock_ev[name].wait(10.0)
1469

    
1470
        if expire and t.isAlive():
1471
          # Wait some time after getting the notification to make sure the lock
1472
          # acquire will expire
1473
          SafeSleep(lockwait[idx])
1474

    
1475
        self.ls.release(names=name)
1476

    
1477
      self.assertFalse(self.ls.list_owned())
1478

    
1479
      self._waitThreads()
1480

    
1481
      if expire:
1482
        # Not checking which locks were actually acquired. Doing so would be
1483
        # too timing-dependant.
1484
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1485
      else:
1486
        for i in names:
1487
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1488
        self.assertEqual(self.done.get_nowait(), "got all")
1489
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1490

    
1491
  @_Repeat
1492
  def testConcurrentRemove(self):
1493
    self.ls.add("four")
1494
    self.ls.acquire(["one", "two", "four"])
1495
    self._addThread(target=self._doLockSet, args=(["one", "four"], 0))
1496
    self._addThread(target=self._doLockSet, args=(["one", "four"], 1))
1497
    self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1498
    self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1499
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1500
    self.ls.remove("one")
1501
    self.ls.release()
1502
    self._waitThreads()
1503
    for i in range(4):
1504
      self.failUnlessEqual(self.done.get_nowait(), "ERR")
1505
    self.ls.add(["five", "six"], acquired=1)
1506
    self._addThread(target=self._doLockSet, args=(["three", "six"], 1))
1507
    self._addThread(target=self._doLockSet, args=(["three", "six"], 0))
1508
    self._addThread(target=self._doLockSet, args=(["four", "six"], 1))
1509
    self._addThread(target=self._doLockSet, args=(["four", "six"], 0))
1510
    self.ls.remove("five")
1511
    self.ls.release()
1512
    self._waitThreads()
1513
    for i in range(4):
1514
      self.failUnlessEqual(self.done.get_nowait(), "DONE")
1515
    self.ls.acquire(["three", "four"])
1516
    self._addThread(target=self._doRemoveSet, args=(["four", "six"], ))
1517
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1518
    self.ls.remove("four")
1519
    self._waitThreads()
1520
    self.assertEqual(self.done.get_nowait(), ["six"])
1521
    self._addThread(target=self._doRemoveSet, args=(["two"]))
1522
    self._waitThreads()
1523
    self.assertEqual(self.done.get_nowait(), ["two"])
1524
    self.ls.release()
1525
    # reset lockset
1526
    self._setUpLS()
1527

    
1528
  @_Repeat
1529
  def testConcurrentSharedSetLock(self):
1530
    # share the set-lock...
1531
    self.ls.acquire(None, shared=1)
1532
    # ...another thread can share it too
1533
    self._addThread(target=self._doLockSet, args=(None, 1))
1534
    self._waitThreads()
1535
    self.assertEqual(self.done.get_nowait(), "DONE")
1536
    # ...or just share some elements
1537
    self._addThread(target=self._doLockSet, args=(["one", "three"], 1))
1538
    self._waitThreads()
1539
    self.assertEqual(self.done.get_nowait(), "DONE")
1540
    # ...but not add new ones or remove any
1541
    t = self._addThread(target=self._doAddSet, args=(["nine"]))
1542
    self._addThread(target=self._doRemoveSet, args=(["two"], ))
1543
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1544
    # this just releases the set-lock
1545
    self.ls.release([])
1546
    t.join(60)
1547
    self.assertEqual(self.done.get_nowait(), "DONE")
1548
    # release the lock on the actual elements so remove() can proceed too
1549
    self.ls.release()
1550
    self._waitThreads()
1551
    self.failUnlessEqual(self.done.get_nowait(), ["two"])
1552
    # reset lockset
1553
    self._setUpLS()
1554

    
1555
  @_Repeat
1556
  def testConcurrentExclusiveSetLock(self):
1557
    # acquire the set-lock...
1558
    self.ls.acquire(None, shared=0)
1559
    # ...no one can do anything else
1560
    self._addThread(target=self._doLockSet, args=(None, 1))
1561
    self._addThread(target=self._doLockSet, args=(None, 0))
1562
    self._addThread(target=self._doLockSet, args=(["three"], 0))
1563
    self._addThread(target=self._doLockSet, args=(["two"], 1))
1564
    self._addThread(target=self._doAddSet, args=(["nine"]))
1565
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1566
    self.ls.release()
1567
    self._waitThreads()
1568
    for _ in range(5):
1569
      self.assertEqual(self.done.get(True, 1), "DONE")
1570
    # cleanup
1571
    self._setUpLS()
1572

    
1573
  @_Repeat
1574
  def testConcurrentSetLockAdd(self):
1575
    self.ls.acquire("one")
1576
    # Another thread wants the whole SetLock
1577
    self._addThread(target=self._doLockSet, args=(None, 0))
1578
    self._addThread(target=self._doLockSet, args=(None, 1))
1579
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1580
    self.assertRaises(AssertionError, self.ls.add, "four")
1581
    self.ls.release()
1582
    self._waitThreads()
1583
    self.assertEqual(self.done.get_nowait(), "DONE")
1584
    self.assertEqual(self.done.get_nowait(), "DONE")
1585
    self.ls.acquire(None)
1586
    self._addThread(target=self._doLockSet, args=(None, 0))
1587
    self._addThread(target=self._doLockSet, args=(None, 1))
1588
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1589
    self.ls.add("four")
1590
    self.ls.add("five", acquired=1)
1591
    self.ls.add("six", acquired=1, shared=1)
1592
    self.assertEquals(self.ls.list_owned(),
1593
      set(["one", "two", "three", "five", "six"]))
1594
    self.assertEquals(self.ls.is_owned(), True)
1595
    self.assertEquals(self.ls._names(),
1596
      set(["one", "two", "three", "four", "five", "six"]))
1597
    self.ls.release()
1598
    self._waitThreads()
1599
    self.assertEqual(self.done.get_nowait(), "DONE")
1600
    self.assertEqual(self.done.get_nowait(), "DONE")
1601
    self._setUpLS()
1602

    
1603
  @_Repeat
1604
  def testEmptyLockSet(self):
1605
    # get the set-lock
1606
    self.assertEqual(self.ls.acquire(None), set(["one", "two", "three"]))
1607
    # now empty it...
1608
    self.ls.remove(["one", "two", "three"])
1609
    self.assertFalse(self.ls._names())
1610
    # and adds/locks by another thread still wait
1611
    self._addThread(target=self._doAddSet, args=(["nine"]))
1612
    self._addThread(target=self._doLockSet, args=(None, 1))
1613
    self._addThread(target=self._doLockSet, args=(None, 0))
1614
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1615
    self.ls.release()
1616
    self._waitThreads()
1617
    for _ in range(3):
1618
      self.assertEqual(self.done.get_nowait(), "DONE")
1619
    # empty it again...
1620
    self.assertEqual(self.ls.remove(["nine"]), ["nine"])
1621
    # now share it...
1622
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1623
    # other sharers can go, adds still wait
1624
    self._addThread(target=self._doLockSet, args=(None, 1))
1625
    self._waitThreads()
1626
    self.assertEqual(self.done.get_nowait(), "DONE")
1627
    self._addThread(target=self._doAddSet, args=(["nine"]))
1628
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1629
    self.ls.release()
1630
    self._waitThreads()
1631
    self.assertEqual(self.done.get_nowait(), "DONE")
1632
    self._setUpLS()
1633

    
1634
  def testAcquireWithNamesDowngrade(self):
1635
    self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1636
    self.assertTrue(self.ls.is_owned())
1637
    self.assertFalse(self.ls._get_lock().is_owned())
1638
    self.ls.release()
1639
    self.assertFalse(self.ls.is_owned())
1640
    self.assertFalse(self.ls._get_lock().is_owned())
1641
    # Can't downgrade after releasing
1642
    self.assertRaises(AssertionError, self.ls.downgrade, "two")
1643

    
1644
  def testDowngrade(self):
1645
    # Not owning anything, must raise an exception
1646
    self.assertFalse(self.ls.is_owned())
1647
    self.assertRaises(AssertionError, self.ls.downgrade)
1648

    
1649
    self.assertFalse(compat.any(i.is_owned()
1650
                                for i in self.ls._get_lockdict().values()))
1651
    self.assertFalse(self.ls.check_owned(self.ls._names()))
1652
    for name in self.ls._names():
1653
      self.assertFalse(self.ls.check_owned(name))
1654

    
1655
    self.assertEquals(self.ls.acquire(None, shared=0),
1656
                      set(["one", "two", "three"]))
1657
    self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1658

    
1659
    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1660
    for name in self.ls._names():
1661
      self.assertTrue(self.ls.check_owned(name))
1662
      self.assertTrue(self.ls.check_owned(name, shared=0))
1663
      self.assertFalse(self.ls.check_owned(name, shared=1))
1664

    
1665
    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1666
    self.assertTrue(compat.all(i.is_owned(shared=0)
1667
                               for i in self.ls._get_lockdict().values()))
1668

    
1669
    # Start downgrading locks
1670
    self.assertTrue(self.ls.downgrade(names=["one"]))
1671
    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1672
    self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1673
                               for name, lock in
1674
                                 self.ls._get_lockdict().items()))
1675

    
1676
    self.assertFalse(self.ls.check_owned("one", shared=0))
1677
    self.assertTrue(self.ls.check_owned("one", shared=1))
1678
    self.assertTrue(self.ls.check_owned("two", shared=0))
1679
    self.assertTrue(self.ls.check_owned("three", shared=0))
1680

    
1681
    # Downgrade second lock
1682
    self.assertTrue(self.ls.downgrade(names="two"))
1683
    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1684
    should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1685
    self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1686
                               for name, lock in
1687
                                 self.ls._get_lockdict().items()))
1688

    
1689
    self.assertFalse(self.ls.check_owned("one", shared=0))
1690
    self.assertTrue(self.ls.check_owned("one", shared=1))
1691
    self.assertFalse(self.ls.check_owned("two", shared=0))
1692
    self.assertTrue(self.ls.check_owned("two", shared=1))
1693
    self.assertTrue(self.ls.check_owned("three", shared=0))
1694

    
1695
    # Downgrading the last exclusive lock to shared must downgrade the
1696
    # lockset-internal lock too
1697
    self.assertTrue(self.ls.downgrade(names="three"))
1698
    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1699
    self.assertTrue(compat.all(i.is_owned(shared=1)
1700
                               for i in self.ls._get_lockdict().values()))
1701

    
1702
    # Verify owned locks
1703
    for name in self.ls._names():
1704
      self.assertTrue(self.ls.check_owned(name, shared=1))
1705

    
1706
    # Downgrading a shared lock must be a no-op
1707
    self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1708
    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1709
    self.assertTrue(compat.all(i.is_owned(shared=1)
1710
                               for i in self.ls._get_lockdict().values()))
1711

    
1712
    self.ls.release()
1713

    
1714
  def testDowngradeEverything(self):
1715
    self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0),
1716
                     set(["one", "two", "three"]))
1717
    self.assertTrue(self.ls.owning_all())
1718

    
1719
    # Ensure all locks are now owned in exclusive mode
1720
    for name in self.ls._names():
1721
      self.assertTrue(self.ls.check_owned(name, shared=0))
1722

    
1723
    # Downgrade everything
1724
    self.assertTrue(self.ls.downgrade())
1725

    
1726
    # Ensure all locks are now owned in shared mode
1727
    for name in self.ls._names():
1728
      self.assertTrue(self.ls.check_owned(name, shared=1))
1729

    
1730
    self.assertTrue(self.ls.owning_all())
1731

    
1732
  def testPriority(self):
1733
    def _Acquire(prev, next, name, priority, success_fn):
1734
      prev.wait()
1735
      self.assert_(self.ls.acquire(name, shared=0,
1736
                                   priority=priority,
1737
                                   test_notify=lambda _: next.set()))
1738
      try:
1739
        success_fn()
1740
      finally:
1741
        self.ls.release()
1742

    
1743
    # Get all in exclusive mode
1744
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1745

    
1746
    done_two = Queue.Queue(0)
1747

    
1748
    first = threading.Event()
1749
    prev = first
1750

    
1751
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1752
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1753

    
1754
    # Use a deterministic random generator
1755
    random.Random(741).shuffle(acquires)
1756

    
1757
    for (name, prio, done) in acquires:
1758
      ev = threading.Event()
1759
      self._addThread(target=_Acquire,
1760
                      args=(prev, ev, name, prio,
1761
                            compat.partial(done.put, "Prio%s" % prio)))
1762
      prev = ev
1763

    
1764
    # Start acquires
1765
    first.set()
1766

    
1767
    # Wait for last acquire to start
1768
    prev.wait()
1769

    
1770
    # Let threads acquire locks
1771
    self.ls.release()
1772

    
1773
    # Wait for threads to finish
1774
    self._waitThreads()
1775

    
1776
    for i in range(1, 33):
1777
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1778
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1779

    
1780
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1781
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1782

    
1783
  def testNamesWithOpportunisticAndTimeout(self):
1784
    self.assertRaises(AssertionError, self.ls.acquire,
1785
                      ["one", "two"], timeout=1.0, opportunistic=True)
1786

    
1787
  def testOpportunisticWithUnknownName(self):
1788
    name = "unknown"
1789
    self.assertFalse(name in self.ls._names())
1790
    result = self.ls.acquire(name, opportunistic=True)
1791
    self.assertFalse(result)
1792
    self.assertFalse(self.ls.list_owned())
1793

    
1794
    result = self.ls.acquire(["two", name], opportunistic=True)
1795
    self.assertEqual(result, set(["two"]))
1796
    self.assertEqual(self.ls.list_owned(), set(["two"]))
1797

    
1798
    self.ls.release()
1799

    
1800
  def testSimpleOpportunisticAcquisition(self):
1801
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1802

    
1803
    # Hold a lock in main thread
1804
    self.assertEqual(self.ls.acquire("two", shared=0), set(["two"]))
1805

    
1806
    def fn():
1807
      # The lock "two" is held by the main thread
1808
      result = self.ls.acquire(["one", "two"], shared=0, opportunistic=True)
1809
      self.assertEqual(result, set(["one"]))
1810
      self.assertEqual(self.ls.list_owned(), set(["one"]))
1811
      self.assertFalse(self.ls._get_lock().is_owned())
1812

    
1813
      self.ls.release()
1814
      self.assertFalse(self.ls.list_owned())
1815

    
1816
      # Try to acquire the lock held by the main thread
1817
      result = self.ls.acquire(["two"], shared=0, opportunistic=True)
1818
      self.assertFalse(self.ls._get_lock().is_owned())
1819
      self.assertFalse(result)
1820
      self.assertFalse(self.ls.list_owned())
1821

    
1822
      # Try to acquire all locks
1823
      result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True)
1824
      self.assertTrue(self.ls._get_lock().is_owned(),
1825
                      msg="Internal lock is not owned")
1826
      self.assertEqual(result, set(["one", "three"]))
1827
      self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
1828

    
1829
      self.ls.release()
1830

    
1831
      self.assertFalse(self.ls.list_owned())
1832

    
1833
      self.done.put(True)
1834

    
1835
    self._addThread(target=fn)
1836

    
1837
    # Wait for threads to finish
1838
    self._waitThreads()
1839

    
1840
    self.assertEqual(self.ls.list_owned(), set(["two"]))
1841

    
1842
    self.ls.release()
1843
    self.assertFalse(self.ls.list_owned())
1844
    self.assertFalse(self.ls._get_lock().is_owned())
1845

    
1846
    self.assertTrue(self.done.get_nowait())
1847
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1848

    
1849
  def testOpportunisticAcquisitionWithoutNamesExpires(self):
1850
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1851

    
1852
    # Hold all locks in main thread
1853
    self.ls.acquire(locking.ALL_SET, shared=0)
1854
    self.assertTrue(self.ls._get_lock().is_owned())
1855

    
1856
    def fn():
1857
      # Try to acquire all locks in separate thread
1858
      result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True,
1859
                               timeout=0.1)
1860
      self.assertFalse(result)
1861
      self.assertFalse(self.ls._get_lock().is_owned())
1862
      self.assertFalse(self.ls.list_owned())
1863

    
1864
      # Try once more without a timeout
1865
      self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
1866

    
1867
      self.done.put(True)
1868

    
1869
    self._addThread(target=fn)
1870

    
1871
    # Wait for threads to finish
1872
    self._waitThreads()
1873

    
1874
    self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
1875

    
1876
    self.ls.release()
1877
    self.assertFalse(self.ls.list_owned())
1878
    self.assertFalse(self.ls._get_lock().is_owned(shared=0))
1879

    
1880
    self.assertTrue(self.done.get_nowait())
1881
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1882

    
1883
  def testSharedOpportunisticAcquisitionWithoutNames(self):
1884
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1885

    
1886
    # Hold all locks in main thread
1887
    self.ls.acquire(locking.ALL_SET, shared=1)
1888
    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1889

    
1890
    def fn():
1891
      # Try to acquire all locks in separate thread in shared mode
1892
      result = self.ls.acquire(locking.ALL_SET, shared=1, opportunistic=True,
1893
                               timeout=0.1)
1894
      self.assertEqual(result, set(["one", "two", "three"]))
1895
      self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1896
      self.ls.release()
1897
      self.assertFalse(self.ls._get_lock().is_owned())
1898

    
1899
      # Try one in exclusive mode
1900
      self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
1901

    
1902
      self.done.put(True)
1903

    
1904
    self._addThread(target=fn)
1905

    
1906
    # Wait for threads to finish
1907
    self._waitThreads()
1908

    
1909
    self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
1910

    
1911
    self.ls.release()
1912
    self.assertFalse(self.ls.list_owned())
1913
    self.assertFalse(self.ls._get_lock().is_owned())
1914

    
1915
    self.assertTrue(self.done.get_nowait())
1916
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1917

    
1918
  def testLockDeleteWithOpportunisticAcquisition(self):
1919
    # This test exercises some code handling LockError on acquisition, that is
1920
    # after all lock names have been gathered. This shouldn't happen in reality
1921
    # as removing locks from the set requires the lockset-internal lock, but
1922
    # the code should handle the situation anyway.
1923
    ready = threading.Event()
1924
    finished = threading.Event()
1925

    
1926
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1927

    
1928
    # Thread function to delete lock
1929
    def fn():
1930
      # Wait for notification
1931
      ready.wait()
1932

    
1933
      # Delete lock named "two" by accessing lockset-internal data
1934
      ld = self.ls._get_lockdict()
1935
      self.assertTrue(ld["two"].delete())
1936

    
1937
      self.done.put("deleted.two")
1938

    
1939
      # Notify helper
1940
      finished.set()
1941

    
1942
    self._addThread(target=fn)
1943

    
1944
    # Notification helper, called when lock already holds internal lock.
1945
    # Therefore only one of the locks not yet locked can be deleted.
1946
    def notify(name):
1947
      self.done.put("notify.%s" % name)
1948

    
1949
      if name == "one":
1950
        # Tell helper thread to delete lock "two"
1951
        ready.set()
1952
        finished.wait()
1953

    
1954
    # Hold all locks in main thread
1955
    self.ls.acquire(locking.ALL_SET, shared=0, test_notify=notify)
1956
    self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
1957

    
1958
    # Wait for threads to finish
1959
    self._waitThreads()
1960

    
1961
    # Release all locks
1962
    self.ls.release()
1963
    self.assertFalse(self.ls.list_owned())
1964
    self.assertFalse(self.ls._get_lock().is_owned())
1965

    
1966
    self.assertEqual(self.done.get_nowait(), "notify.one")
1967
    self.assertEqual(self.done.get_nowait(), "deleted.two")
1968
    self.assertEqual(self.done.get_nowait(), "notify.three")
1969
    self.assertEqual(self.done.get_nowait(), "notify.two")
1970
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1971

    
1972

    
1973
class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
1974
  def setUp(self):
1975
    self.fn = locking._GetLsAcquireModeAndTimeouts
1976

    
1977
  def testOpportunisticWithoutNames(self):
1978
    (mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True)
1979
    self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
1980
    self.assertTrue(ls_timeout_fn is None)
1981
    self.assertEqual(timeout_fn(), 0)
1982

    
1983
  def testAllInputCombinations(self):
1984
    for want_all in [False, True]:
1985
      for timeout in [None, 0, 100]:
1986
        for opportunistic in [False, True]:
1987
          if (opportunistic and
1988
              not want_all and
1989
              timeout is not None):
1990
            # Can't accept a timeout when acquiring opportunistically
1991
            self.assertRaises(AssertionError, self.fn,
1992
                              want_all, timeout, opportunistic)
1993
          else:
1994
            (mode, ls_timeout_fn, timeout_fn) = \
1995
              self.fn(want_all, timeout, opportunistic)
1996

    
1997
            if opportunistic:
1998
              self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
1999
              self.assertEqual(timeout_fn(), 0)
2000
            else:
2001
              self.assertTrue(callable(timeout_fn))
2002
              if want_all:
2003
                self.assertEqual(mode, locking._LS_ACQUIRE_ALL)
2004
              else:
2005
                self.assertEqual(mode, locking._LS_ACQUIRE_EXACT)
2006

    
2007
            if want_all:
2008
              self.assertTrue(callable(ls_timeout_fn))
2009
            else:
2010
              self.assertTrue(ls_timeout_fn is None)
2011

    
2012

    
2013
class TestGanetiLockManager(_ThreadedTestCase):
2014
  def setUp(self):
2015
    _ThreadedTestCase.setUp(self)
2016
    self.nodes = ["n1", "n2"]
2017
    self.nodegroups = ["g1", "g2"]
2018
    self.instances = ["i1", "i2", "i3"]
2019
    self.networks = ["net1", "net2", "net3"]
2020
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
2021
                                        self.instances, self.networks)
2022

    
2023
  def tearDown(self):
2024
    # Don't try this at home...
2025
    locking.GanetiLockManager._instance = None
2026

    
2027
  def testLockingConstants(self):
2028
    # The locking library internally cheats by assuming its constants have some
2029
    # relationships with each other. Check those hold true.
2030
    # This relationship is also used in the Processor to recursively acquire
2031
    # the right locks. Again, please don't break it.
2032
    for i in range(len(locking.LEVELS)):
2033
      self.assertEqual(i, locking.LEVELS[i])
2034

    
2035
  def testDoubleGLFails(self):
2036
    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
2037

    
2038
  def testLockNames(self):
2039
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
2040
    self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
2041
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
2042
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
2043
                     set(self.nodegroups))
2044
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
2045
                     set(self.instances))
2046
    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
2047
                     set(self.networks))
2048

    
2049
  def testInitAndResources(self):
2050
    locking.GanetiLockManager._instance = None
2051
    self.GL = locking.GanetiLockManager([], [], [], [])
2052
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
2053
    self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
2054
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
2055
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
2056
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
2057
    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
2058

    
2059
    locking.GanetiLockManager._instance = None
2060
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
2061
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
2062
    self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
2063
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
2064
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
2065
                                    set(self.nodegroups))
2066
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
2067
    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
2068

    
2069
    locking.GanetiLockManager._instance = None
2070
    self.GL = locking.GanetiLockManager([], [], self.instances, [])
2071
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
2072
    self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
2073
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
2074
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
2075
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
2076
                     set(self.instances))
2077

    
2078
    locking.GanetiLockManager._instance = None
2079
    self.GL = locking.GanetiLockManager([], [], [], self.networks)
2080
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
2081
    self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
2082
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
2083
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
2084
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
2085
    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
2086
                     set(self.networks))
2087

    
2088
  def testAcquireRelease(self):
2089
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
2090
    self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(["BGL"]))
2091
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i1"])
2092
    self.GL.acquire(locking.LEVEL_NODEGROUP, ["g2"])
2093
    self.GL.acquire(locking.LEVEL_NODE, ["n1", "n2"], shared=1)
2094
    self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
2095
                                        shared=1))
2096
    self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
2097
    self.GL.release(locking.LEVEL_NODE, ["n2"])
2098
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(["n1"]))
2099
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
2100
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
2101
    self.GL.release(locking.LEVEL_NODE)
2102
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
2103
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
2104
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
2105
    self.GL.release(locking.LEVEL_NODEGROUP)
2106
    self.GL.release(locking.LEVEL_INSTANCE)
2107
    self.assertRaises(errors.LockError, self.GL.acquire,
2108
                      locking.LEVEL_INSTANCE, ["i5"])
2109
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"], shared=1)
2110
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i3"]))
2111

    
2112
  def testAcquireWholeSets(self):
2113
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
2114
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
2115
                      set(self.instances))
2116
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
2117
                      set(self.instances))
2118
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
2119
                      set(self.nodegroups))
2120
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
2121
                      set(self.nodegroups))
2122
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
2123
                      set(self.nodes))
2124
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
2125
                      set(self.nodes))
2126
    self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
2127
    self.assertTrue(self.GL.owning_all(locking.LEVEL_NODEGROUP))
2128
    self.assertTrue(self.GL.owning_all(locking.LEVEL_NODE))
2129
    self.GL.release(locking.LEVEL_NODE)
2130
    self.GL.release(locking.LEVEL_NODEGROUP)
2131
    self.GL.release(locking.LEVEL_INSTANCE)
2132
    self.GL.release(locking.LEVEL_CLUSTER)
2133

    
2134
  def testAcquireWholeAndPartial(self):
2135
    self.assertFalse(self.GL.owning_all(locking.LEVEL_INSTANCE))
2136
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
2137
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
2138
                      set(self.instances))
2139
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
2140
                      set(self.instances))
2141
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ["n2"], shared=1),
2142
                      set(["n2"]))
2143
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
2144
                      set(["n2"]))
2145
    self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
2146
    self.assertFalse(self.GL.owning_all(locking.LEVEL_NODE))
2147
    self.GL.release(locking.LEVEL_NODE)
2148
    self.GL.release(locking.LEVEL_INSTANCE)
2149
    self.GL.release(locking.LEVEL_CLUSTER)
2150

    
2151
  def testBGLDependency(self):
2152
    self.assertRaises(AssertionError, self.GL.acquire,
2153
                      locking.LEVEL_NODE, ["n1", "n2"])
2154
    self.assertRaises(AssertionError, self.GL.acquire,
2155
                      locking.LEVEL_INSTANCE, ["i3"])
2156
    self.assertRaises(AssertionError, self.GL.acquire,
2157
                      locking.LEVEL_NODEGROUP, ["g1"])
2158
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
2159
    self.GL.acquire(locking.LEVEL_NODE, ["n1"])
2160
    self.assertRaises(AssertionError, self.GL.release,
2161
                      locking.LEVEL_CLUSTER, ["BGL"])
2162
    self.assertRaises(AssertionError, self.GL.release,
2163
                      locking.LEVEL_CLUSTER)
2164
    self.GL.release(locking.LEVEL_NODE)
2165
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i1", "i2"])
2166
    self.assertRaises(AssertionError, self.GL.release,
2167
                      locking.LEVEL_CLUSTER, ["BGL"])
2168
    self.assertRaises(AssertionError, self.GL.release,
2169
                      locking.LEVEL_CLUSTER)
2170
    self.GL.release(locking.LEVEL_INSTANCE)
2171
    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
2172
    self.GL.release(locking.LEVEL_NODEGROUP, ["g1"])
2173
    self.assertRaises(AssertionError, self.GL.release,
2174
                      locking.LEVEL_CLUSTER, ["BGL"])
2175
    self.assertRaises(AssertionError, self.GL.release,
2176
                      locking.LEVEL_CLUSTER)
2177
    self.GL.release(locking.LEVEL_NODEGROUP)
2178
    self.GL.release(locking.LEVEL_CLUSTER)
2179

    
2180
  def testWrongOrder(self):
2181
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
2182
    self.GL.acquire(locking.LEVEL_NODE, ["n2"])
2183
    self.assertRaises(AssertionError, self.GL.acquire,
2184
                      locking.LEVEL_NODE, ["n1"])
2185
    self.assertRaises(AssertionError, self.GL.acquire,
2186
                      locking.LEVEL_NODEGROUP, ["g1"])
2187
    self.assertRaises(AssertionError, self.GL.acquire,
2188
                      locking.LEVEL_INSTANCE, ["i2"])
2189

    
2190
  def testModifiableLevels(self):
2191
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
2192
                      ["BGL2"])
2193
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_NODE_ALLOC,
2194
                      ["NAL2"])
2195
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"])
2196
    self.GL.add(locking.LEVEL_INSTANCE, ["i4"])
2197
    self.GL.remove(locking.LEVEL_INSTANCE, ["i3"])
2198
    self.GL.remove(locking.LEVEL_INSTANCE, ["i1"])
2199
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(["i2", "i4"]))
2200
    self.GL.add(locking.LEVEL_NODE, ["n3"])
2201
    self.GL.remove(locking.LEVEL_NODE, ["n1"])
2202
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(["n2", "n3"]))
2203
    self.GL.add(locking.LEVEL_NODEGROUP, ["g3"])
2204
    self.GL.remove(locking.LEVEL_NODEGROUP, ["g2"])
2205
    self.GL.remove(locking.LEVEL_NODEGROUP, ["g1"])
2206
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(["g3"]))
2207
    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
2208
                      ["BGL2"])
2209

    
2210
  # Helper function to run as a thread that shared the BGL and then acquires
2211
  # some locks at another level.
2212
  def _doLock(self, level, names, shared):
2213
    try:
2214
      self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
2215
      self.GL.acquire(level, names, shared=shared)
2216
      self.done.put("DONE")
2217
      self.GL.release(level)
2218
      self.GL.release(locking.LEVEL_CLUSTER)
2219
    except errors.LockError:
2220
      self.done.put("ERR")
2221

    
2222
  @_Repeat
2223
  def testConcurrency(self):
2224
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
2225
    self._addThread(target=self._doLock,
2226
                    args=(locking.LEVEL_INSTANCE, "i1", 1))
2227
    self._waitThreads()
2228
    self.assertEqual(self.done.get_nowait(), "DONE")
2229
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"])
2230
    self._addThread(target=self._doLock,
2231
                    args=(locking.LEVEL_INSTANCE, "i1", 1))
2232
    self._waitThreads()
2233
    self.assertEqual(self.done.get_nowait(), "DONE")
2234
    self._addThread(target=self._doLock,
2235
                    args=(locking.LEVEL_INSTANCE, "i3", 1))
2236
    self.assertRaises(Queue.Empty, self.done.get_nowait)
2237
    self.GL.release(locking.LEVEL_INSTANCE)
2238
    self._waitThreads()
2239
    self.assertEqual(self.done.get_nowait(), "DONE")
2240
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i2"], shared=1)
2241
    self._addThread(target=self._doLock,
2242
                    args=(locking.LEVEL_INSTANCE, "i2", 1))
2243
    self._waitThreads()
2244
    self.assertEqual(self.done.get_nowait(), "DONE")
2245
    self._addThread(target=self._doLock,
2246
                    args=(locking.LEVEL_INSTANCE, "i2", 0))
2247
    self.assertRaises(Queue.Empty, self.done.get_nowait)
2248
    self.GL.release(locking.LEVEL_INSTANCE)
2249
    self._waitThreads()
2250
    self.assertEqual(self.done.get(True, 1), "DONE")
2251
    self.GL.release(locking.LEVEL_CLUSTER, ["BGL"])
2252

    
2253

    
2254
class TestLockMonitor(_ThreadedTestCase):
2255
  def setUp(self):
2256
    _ThreadedTestCase.setUp(self)
2257
    self.lm = locking.LockMonitor()
2258

    
2259
  def testSingleThread(self):
2260
    locks = []
2261

    
2262
    for i in range(100):
2263
      name = "TestLock%s" % i
2264
      locks.append(locking.SharedLock(name, monitor=self.lm))
2265

    
2266
    self.assertEqual(len(self.lm._locks), len(locks))
2267
    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
2268
    self.assertEqual(len(result.fields), 1)
2269
    self.assertEqual(len(result.data), 100)
2270

    
2271
    # Delete all locks
2272
    del locks[:]
2273

    
2274
    # The garbage collector might needs some time
2275
    def _CheckLocks():
2276
      if self.lm._locks:
2277
        raise utils.RetryAgain()
2278

    
2279
    utils.Retry(_CheckLocks, 0.1, 30.0)
2280

    
2281
    self.assertFalse(self.lm._locks)
2282

    
2283
  def testMultiThread(self):
2284
    locks = []
2285

    
2286
    def _CreateLock(prev, next, name):
2287
      prev.wait()
2288
      locks.append(locking.SharedLock(name, monitor=self.lm))
2289
      if next:
2290
        next.set()
2291

    
2292
    expnames = []
2293

    
2294
    first = threading.Event()
2295
    prev = first
2296

    
2297
    # Use a deterministic random generator
2298
    for i in random.Random(4263).sample(range(100), 33):
2299
      name = "MtTestLock%s" % i
2300
      expnames.append(name)
2301

    
2302
      ev = threading.Event()
2303
      self._addThread(target=_CreateLock, args=(prev, ev, name))
2304
      prev = ev
2305

    
2306
    # Add locks
2307
    first.set()
2308
    self._waitThreads()
2309

    
2310
    # Check order in which locks were added
2311
    self.assertEqual([i.name for i in locks], expnames)
2312

    
2313
    # Check query result
2314
    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2315
    self.assert_(isinstance(result, dict))
2316
    response = objects.QueryResponse.FromDict(result)
2317
    self.assertEqual(response.data,
2318
                     [[(constants.RS_NORMAL, name),
2319
                       (constants.RS_NORMAL, None),
2320
                       (constants.RS_NORMAL, None),
2321
                       (constants.RS_NORMAL, [])]
2322
                      for name in utils.NiceSort(expnames)])
2323
    self.assertEqual(len(response.fields), 4)
2324
    self.assertEqual(["name", "mode", "owner", "pending"],
2325
                     [fdef.name for fdef in response.fields])
2326

    
2327
    # Test exclusive acquire
2328
    for tlock in locks[::4]:
2329
      tlock.acquire(shared=0)
2330
      try:
2331
        def _GetExpResult(name):
2332
          if tlock.name == name:
2333
            return [(constants.RS_NORMAL, name),
2334
                    (constants.RS_NORMAL, "exclusive"),
2335
                    (constants.RS_NORMAL,
2336
                     [threading.currentThread().getName()]),
2337
                    (constants.RS_NORMAL, [])]
2338
          return [(constants.RS_NORMAL, name),
2339
                  (constants.RS_NORMAL, None),
2340
                  (constants.RS_NORMAL, None),
2341
                  (constants.RS_NORMAL, [])]
2342

    
2343
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2344
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2345
                         [_GetExpResult(name)
2346
                          for name in utils.NiceSort(expnames)])
2347
      finally:
2348
        tlock.release()
2349

    
2350
    # Test shared acquire
2351
    def _Acquire(lock, shared, ev, notify):
2352
      lock.acquire(shared=shared)
2353
      try:
2354
        notify.set()
2355
        ev.wait()
2356
      finally:
2357
        lock.release()
2358

    
2359
    for tlock1 in locks[::11]:
2360
      for tlock2 in locks[::-15]:
2361
        if tlock2 == tlock1:
2362
          # Avoid deadlocks
2363
          continue
2364

    
2365
        for tlock3 in locks[::10]:
2366
          if tlock3 in (tlock2, tlock1):
2367
            # Avoid deadlocks
2368
            continue
2369

    
2370
          releaseev = threading.Event()
2371

    
2372
          # Acquire locks
2373
          acquireev = []
2374
          tthreads1 = []
2375
          for i in range(3):
2376
            ev = threading.Event()
2377
            tthreads1.append(self._addThread(target=_Acquire,
2378
                                             args=(tlock1, 1, releaseev, ev)))
2379
            acquireev.append(ev)
2380

    
2381
          ev = threading.Event()
2382
          tthread2 = self._addThread(target=_Acquire,
2383
                                     args=(tlock2, 1, releaseev, ev))
2384
          acquireev.append(ev)
2385

    
2386
          ev = threading.Event()
2387
          tthread3 = self._addThread(target=_Acquire,
2388
                                     args=(tlock3, 0, releaseev, ev))
2389
          acquireev.append(ev)
2390

    
2391
          # Wait for all locks to be acquired
2392
          for i in acquireev:
2393
            i.wait()
2394

    
2395
          # Check query result
2396
          result = self.lm.QueryLocks(["name", "mode", "owner"])
2397
          response = objects.QueryResponse.FromDict(result)
2398
          for (name, mode, owner) in response.data:
2399
            (name_status, name_value) = name
2400
            (owner_status, owner_value) = owner
2401

    
2402
            self.assertEqual(name_status, constants.RS_NORMAL)
2403
            self.assertEqual(owner_status, constants.RS_NORMAL)
2404

    
2405
            if name_value == tlock1.name:
2406
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2407
              self.assertEqual(set(owner_value),
2408
                               set(i.getName() for i in tthreads1))
2409
              continue
2410

    
2411
            if name_value == tlock2.name:
2412
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2413
              self.assertEqual(owner_value, [tthread2.getName()])
2414
              continue
2415

    
2416
            if name_value == tlock3.name:
2417
              self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2418
              self.assertEqual(owner_value, [tthread3.getName()])
2419
              continue
2420

    
2421
            self.assert_(name_value in expnames)
2422
            self.assertEqual(mode, (constants.RS_NORMAL, None))
2423
            self.assert_(owner_value is None)
2424

    
2425
          # Release locks again
2426
          releaseev.set()
2427

    
2428
          self._waitThreads()
2429

    
2430
          result = self.lm.QueryLocks(["name", "mode", "owner"])
2431
          self.assertEqual(objects.QueryResponse.FromDict(result).data,
2432
                           [[(constants.RS_NORMAL, name),
2433
                             (constants.RS_NORMAL, None),
2434
                             (constants.RS_NORMAL, None)]
2435
                            for name in utils.NiceSort(expnames)])
2436

    
2437
  def testDelete(self):
2438
    lock = locking.SharedLock("TestLock", monitor=self.lm)
2439

    
2440
    self.assertEqual(len(self.lm._locks), 1)
2441
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2442
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2443
                     [[(constants.RS_NORMAL, lock.name),
2444
                       (constants.RS_NORMAL, None),
2445
                       (constants.RS_NORMAL, None)]])
2446

    
2447
    lock.delete()
2448

    
2449
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2450
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2451
                     [[(constants.RS_NORMAL, lock.name),
2452
                       (constants.RS_NORMAL, "deleted"),
2453
                       (constants.RS_NORMAL, None)]])
2454
    self.assertEqual(len(self.lm._locks), 1)
2455

    
2456
  def testPending(self):
2457
    def _Acquire(lock, shared, prev, next):
2458
      prev.wait()
2459

    
2460
      lock.acquire(shared=shared, test_notify=next.set)
2461
      try:
2462
        pass
2463
      finally:
2464
        lock.release()
2465

    
2466
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
2467

    
2468
    for shared in [0, 1]:
2469
      lock.acquire()
2470
      try:
2471
        self.assertEqual(len(self.lm._locks), 1)
2472
        result = self.lm.QueryLocks(["name", "mode", "owner"])
2473
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2474
                         [[(constants.RS_NORMAL, lock.name),
2475
                           (constants.RS_NORMAL, "exclusive"),
2476
                           (constants.RS_NORMAL,
2477
                            [threading.currentThread().getName()])]])
2478

    
2479
        threads = []
2480

    
2481
        first = threading.Event()
2482
        prev = first
2483

    
2484
        for i in range(5):
2485
          ev = threading.Event()
2486
          threads.append(self._addThread(target=_Acquire,
2487
                                          args=(lock, shared, prev, ev)))
2488
          prev = ev
2489

    
2490
        # Start acquires
2491
        first.set()
2492

    
2493
        # Wait for last acquire to start waiting
2494
        prev.wait()
2495

    
2496
        # NOTE: This works only because QueryLocks will acquire the
2497
        # lock-internal lock again and won't be able to get the information
2498
        # until it has the lock. By then the acquire should be registered in
2499
        # SharedLock.__pending (otherwise it's a bug).
2500

    
2501
        # All acquires are waiting now
2502
        if shared:
2503
          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2504
        else:
2505
          pending = [("exclusive", [t.getName()]) for t in threads]
2506

    
2507
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2508
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2509
                         [[(constants.RS_NORMAL, lock.name),
2510
                           (constants.RS_NORMAL, "exclusive"),
2511
                           (constants.RS_NORMAL,
2512
                            [threading.currentThread().getName()]),
2513
                           (constants.RS_NORMAL, pending)]])
2514

    
2515
        self.assertEqual(len(self.lm._locks), 1)
2516
      finally:
2517
        lock.release()
2518

    
2519
      self._waitThreads()
2520

    
2521
      # No pending acquires
2522
      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2523
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2524
                       [[(constants.RS_NORMAL, lock.name),
2525
                         (constants.RS_NORMAL, None),
2526
                         (constants.RS_NORMAL, None),
2527
                         (constants.RS_NORMAL, [])]])
2528

    
2529
      self.assertEqual(len(self.lm._locks), 1)
2530

    
2531
  def testDeleteAndRecreate(self):
2532
    lname = "TestLock101923193"
2533

    
2534
    # Create some locks with the same name and keep all references
2535
    locks = [locking.SharedLock(lname, monitor=self.lm)
2536
             for _ in range(5)]
2537

    
2538
    self.assertEqual(len(self.lm._locks), len(locks))
2539

    
2540
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2541
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2542
                     [[(constants.RS_NORMAL, lname),
2543
                       (constants.RS_NORMAL, None),
2544
                       (constants.RS_NORMAL, None)]] * 5)
2545

    
2546
    locks[2].delete()
2547

    
2548
    # Check information order
2549
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2550
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2551
                     [[(constants.RS_NORMAL, lname),
2552
                       (constants.RS_NORMAL, None),
2553
                       (constants.RS_NORMAL, None)]] * 2 +
2554
                     [[(constants.RS_NORMAL, lname),
2555
                       (constants.RS_NORMAL, "deleted"),
2556
                       (constants.RS_NORMAL, None)]] +
2557
                     [[(constants.RS_NORMAL, lname),
2558
                       (constants.RS_NORMAL, None),
2559
                       (constants.RS_NORMAL, None)]] * 2)
2560

    
2561
    locks[1].acquire(shared=0)
2562

    
2563
    last_status = [
2564
      [(constants.RS_NORMAL, lname),
2565
       (constants.RS_NORMAL, None),
2566
       (constants.RS_NORMAL, None)],
2567
      [(constants.RS_NORMAL, lname),
2568
       (constants.RS_NORMAL, "exclusive"),
2569
       (constants.RS_NORMAL, [threading.currentThread().getName()])],
2570
      [(constants.RS_NORMAL, lname),
2571
       (constants.RS_NORMAL, "deleted"),
2572
       (constants.RS_NORMAL, None)],
2573
      [(constants.RS_NORMAL, lname),
2574
       (constants.RS_NORMAL, None),
2575
       (constants.RS_NORMAL, None)],
2576
      [(constants.RS_NORMAL, lname),
2577
       (constants.RS_NORMAL, None),
2578
       (constants.RS_NORMAL, None)],
2579
      ]
2580

    
2581
    # Check information order
2582
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2583
    self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2584

    
2585
    self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2586
    self.assertEqual(len(self.lm._locks), len(locks))
2587

    
2588
    # Check lock deletion
2589
    for idx in range(len(locks)):
2590
      del locks[0]
2591
      assert gc.isenabled()
2592
      gc.collect()
2593
      self.assertEqual(len(self.lm._locks), len(locks))
2594
      result = self.lm.QueryLocks(["name", "mode", "owner"])
2595
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2596
                       last_status[idx + 1:])
2597

    
2598
    # All locks should have been deleted
2599
    assert not locks
2600
    self.assertFalse(self.lm._locks)
2601

    
2602
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2603
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2604

    
2605
  class _FakeLock:
2606
    def __init__(self):
2607
      self._info = []
2608

    
2609
    def AddResult(self, *args):
2610
      self._info.append(args)
2611

    
2612
    def CountPending(self):
2613
      return len(self._info)
2614

    
2615
    def GetLockInfo(self, requested):
2616
      (exp_requested, result) = self._info.pop(0)
2617

    
2618
      if exp_requested != requested:
2619
        raise Exception("Requested information (%s) does not match"
2620
                        " expectations (%s)" % (requested, exp_requested))
2621

    
2622
      return result
2623

    
2624
  def testMultipleResults(self):
2625
    fl1 = self._FakeLock()
2626
    fl2 = self._FakeLock()
2627

    
2628
    self.lm.RegisterLock(fl1)
2629
    self.lm.RegisterLock(fl2)
2630

    
2631
    # Empty information
2632
    for i in [fl1, fl2]:
2633
      i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2634
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2635
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2636
    for i in [fl1, fl2]:
2637
      self.assertEqual(i.CountPending(), 0)
2638

    
2639
    # Check ordering
2640
    for fn in [lambda x: x, reversed, sorted]:
2641
      fl1.AddResult(set(), list(fn([
2642
        ("aaa", None, None, None),
2643
        ("bbb", None, None, None),
2644
        ])))
2645
      fl2.AddResult(set(), [])
2646
      result = self.lm.QueryLocks(["name"])
2647
      self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2648
        [(constants.RS_NORMAL, "aaa")],
2649
        [(constants.RS_NORMAL, "bbb")],
2650
        ])
2651
      for i in [fl1, fl2]:
2652
        self.assertEqual(i.CountPending(), 0)
2653

    
2654
      for fn2 in [lambda x: x, reversed, sorted]:
2655
        fl1.AddResult(set([query.LQ_MODE]), list(fn([
2656
          # Same name, but different information
2657
          ("aaa", "mode0", None, None),
2658
          ("aaa", "mode1", None, None),
2659
          ("aaa", "mode2", None, None),
2660
          ("aaa", "mode3", None, None),
2661
          ])))
2662
        fl2.AddResult(set([query.LQ_MODE]), [
2663
          ("zzz", "end", None, None),
2664
          ("000", "start", None, None),
2665
          ] + list(fn2([
2666
          ("aaa", "b200", None, None),
2667
          ("aaa", "b300", None, None),
2668
          ])))
2669
        result = self.lm.QueryLocks(["name", "mode"])
2670
        self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2671
          [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2672
          ] + list(fn([
2673
          # Name is the same, so order must be equal to incoming order
2674
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2675
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2676
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2677
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2678
          ])) + list(fn2([
2679
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2680
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2681
          ])) + [
2682
          [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2683
          ])
2684
        for i in [fl1, fl2]:
2685
          self.assertEqual(i.CountPending(), 0)
2686

    
2687

    
2688
if __name__ == "__main__":
2689
  testutils.GanetiTestProgram()