Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 44b4eddc

History | View | Annotate | Download (76.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import gc
32
import itertools
33

    
34
from ganeti import constants
35
from ganeti import locking
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import compat
39
from ganeti import objects
40
from ganeti import query
41

    
42
import testutils
43

    
44

    
45
# This is used to test the ssynchronize decorator.
46
# Since it's passed as input to a decorator it must be declared as a global.
47
_decoratorlock = locking.SharedLock("decorator lock")
48

    
49
#: List for looping tests
50
ITERATIONS = range(8)
51

    
52

    
53
def _Repeat(fn):
54
  """Decorator for executing a function many times"""
55
  def wrapper(*args, **kwargs):
56
    for i in ITERATIONS:
57
      fn(*args, **kwargs)
58
  return wrapper
59

    
60

    
61
def SafeSleep(duration):
62
  start = time.time()
63
  while True:
64
    delay = start + duration - time.time()
65
    if delay <= 0.0:
66
      break
67
    time.sleep(delay)
68

    
69

    
70
class _ThreadedTestCase(unittest.TestCase):
71
  """Test class that supports adding/waiting on threads"""
72
  def setUp(self):
73
    unittest.TestCase.setUp(self)
74
    self.done = Queue.Queue(0)
75
    self.threads = []
76

    
77
  def _addThread(self, *args, **kwargs):
78
    """Create and remember a new thread"""
79
    t = threading.Thread(*args, **kwargs)
80
    self.threads.append(t)
81
    t.start()
82
    return t
83

    
84
  def _waitThreads(self):
85
    """Wait for all our threads to finish"""
86
    for t in self.threads:
87
      t.join(60)
88
      self.failIf(t.isAlive())
89
    self.threads = []
90

    
91

    
92
class _ConditionTestCase(_ThreadedTestCase):
93
  """Common test case for conditions"""
94

    
95
  def setUp(self, cls):
96
    _ThreadedTestCase.setUp(self)
97
    self.lock = threading.Lock()
98
    self.cond = cls(self.lock)
99

    
100
  def _testAcquireRelease(self):
101
    self.assertFalse(self.cond._is_owned())
102
    self.assertRaises(RuntimeError, self.cond.wait, None)
103
    self.assertRaises(RuntimeError, self.cond.notifyAll)
104

    
105
    self.cond.acquire()
106
    self.assert_(self.cond._is_owned())
107
    self.cond.notifyAll()
108
    self.assert_(self.cond._is_owned())
109
    self.cond.release()
110

    
111
    self.assertFalse(self.cond._is_owned())
112
    self.assertRaises(RuntimeError, self.cond.wait, None)
113
    self.assertRaises(RuntimeError, self.cond.notifyAll)
114

    
115
  def _testNotification(self):
116
    def _NotifyAll():
117
      self.done.put("NE")
118
      self.cond.acquire()
119
      self.done.put("NA")
120
      self.cond.notifyAll()
121
      self.done.put("NN")
122
      self.cond.release()
123

    
124
    self.cond.acquire()
125
    self._addThread(target=_NotifyAll)
126
    self.assertEqual(self.done.get(True, 1), "NE")
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.wait(None)
129
    self.assertEqual(self.done.get(True, 1), "NA")
130
    self.assertEqual(self.done.get(True, 1), "NN")
131
    self.assert_(self.cond._is_owned())
132
    self.cond.release()
133
    self.assertFalse(self.cond._is_owned())
134

    
135

    
136
class TestSingleNotifyPipeCondition(_ConditionTestCase):
137
  """SingleNotifyPipeCondition tests"""
138

    
139
  def setUp(self):
140
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141

    
142
  def testAcquireRelease(self):
143
    self._testAcquireRelease()
144

    
145
  def testNotification(self):
146
    self._testNotification()
147

    
148
  def testWaitReuse(self):
149
    self.cond.acquire()
150
    self.cond.wait(0)
151
    self.cond.wait(0.1)
152
    self.cond.release()
153

    
154
  def testNoNotifyReuse(self):
155
    self.cond.acquire()
156
    self.cond.notifyAll()
157
    self.assertRaises(RuntimeError, self.cond.wait, None)
158
    self.assertRaises(RuntimeError, self.cond.notifyAll)
159
    self.cond.release()
160

    
161

    
162
class TestPipeCondition(_ConditionTestCase):
163
  """PipeCondition tests"""
164

    
165
  def setUp(self):
166
    _ConditionTestCase.setUp(self, locking.PipeCondition)
167

    
168
  def testAcquireRelease(self):
169
    self._testAcquireRelease()
170

    
171
  def testNotification(self):
172
    self._testNotification()
173

    
174
  def _TestWait(self, fn):
175
    threads = [
176
      self._addThread(target=fn),
177
      self._addThread(target=fn),
178
      self._addThread(target=fn),
179
      ]
180

    
181
    # Wait for threads to be waiting
182
    for _ in threads:
183
      self.assertEqual(self.done.get(True, 1), "A")
184

    
185
    self.assertRaises(Queue.Empty, self.done.get_nowait)
186

    
187
    self.cond.acquire()
188
    self.assertEqual(len(self.cond._waiters), 3)
189
    self.assertEqual(self.cond._waiters, set(threads))
190
    # This new thread can't acquire the lock, and thus call wait, before we
191
    # release it
192
    self._addThread(target=fn)
193
    self.cond.notifyAll()
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195
    self.cond.release()
196

    
197
    # We should now get 3 W and 1 A (for the new thread) in whatever order
198
    w = 0
199
    a = 0
200
    for i in range(4):
201
      got = self.done.get(True, 1)
202
      if got == "W":
203
        w += 1
204
      elif got == "A":
205
        a += 1
206
      else:
207
        self.fail("Got %s on the done queue" % got)
208

    
209
    self.assertEqual(w, 3)
210
    self.assertEqual(a, 1)
211

    
212
    self.cond.acquire()
213
    self.cond.notifyAll()
214
    self.cond.release()
215
    self._waitThreads()
216
    self.assertEqual(self.done.get_nowait(), "W")
217
    self.assertRaises(Queue.Empty, self.done.get_nowait)
218

    
219
  def testBlockingWait(self):
220
    def _BlockingWait():
221
      self.cond.acquire()
222
      self.done.put("A")
223
      self.cond.wait(None)
224
      self.cond.release()
225
      self.done.put("W")
226

    
227
    self._TestWait(_BlockingWait)
228

    
229
  def testLongTimeoutWait(self):
230
    def _Helper():
231
      self.cond.acquire()
232
      self.done.put("A")
233
      self.cond.wait(15.0)
234
      self.cond.release()
235
      self.done.put("W")
236

    
237
    self._TestWait(_Helper)
238

    
239
  def _TimeoutWait(self, timeout, check):
240
    self.cond.acquire()
241
    self.cond.wait(timeout)
242
    self.cond.release()
243
    self.done.put(check)
244

    
245
  def testShortTimeoutWait(self):
246
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248
    self._waitThreads()
249
    self.assertEqual(self.done.get_nowait(), "T1")
250
    self.assertEqual(self.done.get_nowait(), "T1")
251
    self.assertRaises(Queue.Empty, self.done.get_nowait)
252

    
253
  def testZeroTimeoutWait(self):
254
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257
    self._waitThreads()
258
    self.assertEqual(self.done.get_nowait(), "T0")
259
    self.assertEqual(self.done.get_nowait(), "T0")
260
    self.assertEqual(self.done.get_nowait(), "T0")
261
    self.assertRaises(Queue.Empty, self.done.get_nowait)
262

    
263

    
264
class TestSharedLock(_ThreadedTestCase):
265
  """SharedLock tests"""
266

    
267
  def setUp(self):
268
    _ThreadedTestCase.setUp(self)
269
    self.sl = locking.SharedLock("TestSharedLock")
270

    
271
  def testSequenceAndOwnership(self):
272
    self.assertFalse(self.sl._is_owned())
273
    self.sl.acquire(shared=1)
274
    self.assert_(self.sl._is_owned())
275
    self.assert_(self.sl._is_owned(shared=1))
276
    self.assertFalse(self.sl._is_owned(shared=0))
277
    self.sl.release()
278
    self.assertFalse(self.sl._is_owned())
279
    self.sl.acquire()
280
    self.assert_(self.sl._is_owned())
281
    self.assertFalse(self.sl._is_owned(shared=1))
282
    self.assert_(self.sl._is_owned(shared=0))
283
    self.sl.release()
284
    self.assertFalse(self.sl._is_owned())
285
    self.sl.acquire(shared=1)
286
    self.assert_(self.sl._is_owned())
287
    self.assert_(self.sl._is_owned(shared=1))
288
    self.assertFalse(self.sl._is_owned(shared=0))
289
    self.sl.release()
290
    self.assertFalse(self.sl._is_owned())
291

    
292
  def testBooleanValue(self):
293
    # semaphores are supposed to return a true value on a successful acquire
294
    self.assert_(self.sl.acquire(shared=1))
295
    self.sl.release()
296
    self.assert_(self.sl.acquire())
297
    self.sl.release()
298

    
299
  def testDoubleLockingStoE(self):
300
    self.sl.acquire(shared=1)
301
    self.assertRaises(AssertionError, self.sl.acquire)
302

    
303
  def testDoubleLockingEtoS(self):
304
    self.sl.acquire()
305
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306

    
307
  def testDoubleLockingStoS(self):
308
    self.sl.acquire(shared=1)
309
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310

    
311
  def testDoubleLockingEtoE(self):
312
    self.sl.acquire()
313
    self.assertRaises(AssertionError, self.sl.acquire)
314

    
315
  # helper functions: called in a separate thread they acquire the lock, send
316
  # their identifier on the done queue, then release it.
317
  def _doItSharer(self):
318
    try:
319
      self.sl.acquire(shared=1)
320
      self.done.put('SHR')
321
      self.sl.release()
322
    except errors.LockError:
323
      self.done.put('ERR')
324

    
325
  def _doItExclusive(self):
326
    try:
327
      self.sl.acquire()
328
      self.done.put('EXC')
329
      self.sl.release()
330
    except errors.LockError:
331
      self.done.put('ERR')
332

    
333
  def _doItDelete(self):
334
    try:
335
      self.sl.delete()
336
      self.done.put('DEL')
337
    except errors.LockError:
338
      self.done.put('ERR')
339

    
340
  def testSharersCanCoexist(self):
341
    self.sl.acquire(shared=1)
342
    threading.Thread(target=self._doItSharer).start()
343
    self.assert_(self.done.get(True, 1))
344
    self.sl.release()
345

    
346
  @_Repeat
347
  def testExclusiveBlocksExclusive(self):
348
    self.sl.acquire()
349
    self._addThread(target=self._doItExclusive)
350
    self.assertRaises(Queue.Empty, self.done.get_nowait)
351
    self.sl.release()
352
    self._waitThreads()
353
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354

    
355
  @_Repeat
356
  def testExclusiveBlocksDelete(self):
357
    self.sl.acquire()
358
    self._addThread(target=self._doItDelete)
359
    self.assertRaises(Queue.Empty, self.done.get_nowait)
360
    self.sl.release()
361
    self._waitThreads()
362
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363
    self.sl = locking.SharedLock(self.sl.name)
364

    
365
  @_Repeat
366
  def testExclusiveBlocksSharer(self):
367
    self.sl.acquire()
368
    self._addThread(target=self._doItSharer)
369
    self.assertRaises(Queue.Empty, self.done.get_nowait)
370
    self.sl.release()
371
    self._waitThreads()
372
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373

    
374
  @_Repeat
375
  def testSharerBlocksExclusive(self):
376
    self.sl.acquire(shared=1)
377
    self._addThread(target=self._doItExclusive)
378
    self.assertRaises(Queue.Empty, self.done.get_nowait)
379
    self.sl.release()
380
    self._waitThreads()
381
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382

    
383
  @_Repeat
384
  def testSharerBlocksDelete(self):
385
    self.sl.acquire(shared=1)
386
    self._addThread(target=self._doItDelete)
387
    self.assertRaises(Queue.Empty, self.done.get_nowait)
388
    self.sl.release()
389
    self._waitThreads()
390
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391
    self.sl = locking.SharedLock(self.sl.name)
392

    
393
  @_Repeat
394
  def testWaitingExclusiveBlocksSharer(self):
395
    """SKIPPED testWaitingExclusiveBlockSharer"""
396
    return
397

    
398
    self.sl.acquire(shared=1)
399
    # the lock is acquired in shared mode...
400
    self._addThread(target=self._doItExclusive)
401
    # ...but now an exclusive is waiting...
402
    self._addThread(target=self._doItSharer)
403
    # ...so the sharer should be blocked as well
404
    self.assertRaises(Queue.Empty, self.done.get_nowait)
405
    self.sl.release()
406
    self._waitThreads()
407
    # The exclusive passed before
408
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410

    
411
  @_Repeat
412
  def testWaitingSharerBlocksExclusive(self):
413
    """SKIPPED testWaitingSharerBlocksExclusive"""
414
    return
415

    
416
    self.sl.acquire()
417
    # the lock is acquired in exclusive mode...
418
    self._addThread(target=self._doItSharer)
419
    # ...but now a sharer is waiting...
420
    self._addThread(target=self._doItExclusive)
421
    # ...the exclusive is waiting too...
422
    self.assertRaises(Queue.Empty, self.done.get_nowait)
423
    self.sl.release()
424
    self._waitThreads()
425
    # The sharer passed before
426
    self.assertEqual(self.done.get_nowait(), 'SHR')
427
    self.assertEqual(self.done.get_nowait(), 'EXC')
428

    
429
  def testDelete(self):
430
    self.sl.delete()
431
    self.assertRaises(errors.LockError, self.sl.acquire)
432
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433
    self.assertRaises(errors.LockError, self.sl.delete)
434

    
435
  def testDeleteTimeout(self):
436
    self.sl.delete(timeout=60)
437

    
438
  def testNoDeleteIfSharer(self):
439
    self.sl.acquire(shared=1)
440
    self.assertRaises(AssertionError, self.sl.delete)
441

    
442
  @_Repeat
443
  def testDeletePendingSharersExclusiveDelete(self):
444
    self.sl.acquire()
445
    self._addThread(target=self._doItSharer)
446
    self._addThread(target=self._doItSharer)
447
    self._addThread(target=self._doItExclusive)
448
    self._addThread(target=self._doItDelete)
449
    self.sl.delete()
450
    self._waitThreads()
451
    # The threads who were pending return ERR
452
    for _ in range(4):
453
      self.assertEqual(self.done.get_nowait(), 'ERR')
454
    self.sl = locking.SharedLock(self.sl.name)
455

    
456
  @_Repeat
457
  def testDeletePendingDeleteExclusiveSharers(self):
458
    self.sl.acquire()
459
    self._addThread(target=self._doItDelete)
460
    self._addThread(target=self._doItExclusive)
461
    self._addThread(target=self._doItSharer)
462
    self._addThread(target=self._doItSharer)
463
    self.sl.delete()
464
    self._waitThreads()
465
    # The two threads who were pending return both ERR
466
    self.assertEqual(self.done.get_nowait(), 'ERR')
467
    self.assertEqual(self.done.get_nowait(), 'ERR')
468
    self.assertEqual(self.done.get_nowait(), 'ERR')
469
    self.assertEqual(self.done.get_nowait(), 'ERR')
470
    self.sl = locking.SharedLock(self.sl.name)
471

    
472
  @_Repeat
473
  def testExclusiveAcquireTimeout(self):
474
    for shared in [0, 1]:
475
      on_queue = threading.Event()
476
      release_exclusive = threading.Event()
477

    
478
      def _LockExclusive():
479
        self.sl.acquire(shared=0, test_notify=on_queue.set)
480
        self.done.put("A: start wait")
481
        release_exclusive.wait()
482
        self.done.put("A: end wait")
483
        self.sl.release()
484

    
485
      # Start thread to hold lock in exclusive mode
486
      self._addThread(target=_LockExclusive)
487

    
488
      # Wait for wait to begin
489
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
490

    
491
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
492
      # on the queue
493
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494
                                      test_notify=release_exclusive.set))
495

    
496
      self.done.put("got 2nd")
497
      self.sl.release()
498

    
499
      self._waitThreads()
500

    
501
      self.assertEqual(self.done.get_nowait(), "A: end wait")
502
      self.assertEqual(self.done.get_nowait(), "got 2nd")
503
      self.assertRaises(Queue.Empty, self.done.get_nowait)
504

    
505
  @_Repeat
506
  def testAcquireExpiringTimeout(self):
507
    def _AcquireWithTimeout(shared, timeout):
508
      if not self.sl.acquire(shared=shared, timeout=timeout):
509
        self.done.put("timeout")
510

    
511
    for shared in [0, 1]:
512
      # Lock exclusively
513
      self.sl.acquire()
514

    
515
      # Start shared acquires with timeout between 0 and 20 ms
516
      for i in range(11):
517
        self._addThread(target=_AcquireWithTimeout,
518
                        args=(shared, i * 2.0 / 1000.0))
519

    
520
      # Wait for threads to finish (makes sure the acquire timeout expires
521
      # before releasing the lock)
522
      self._waitThreads()
523

    
524
      # Release lock
525
      self.sl.release()
526

    
527
      for _ in range(11):
528
        self.assertEqual(self.done.get_nowait(), "timeout")
529

    
530
      self.assertRaises(Queue.Empty, self.done.get_nowait)
531

    
532
  @_Repeat
533
  def testSharedSkipExclusiveAcquires(self):
534
    # Tests whether shared acquires jump in front of exclusive acquires in the
535
    # queue.
536

    
537
    def _Acquire(shared, name, notify_ev, wait_ev):
538
      if notify_ev:
539
        notify_fn = notify_ev.set
540
      else:
541
        notify_fn = None
542

    
543
      if wait_ev:
544
        wait_ev.wait()
545

    
546
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
547
        return
548

    
549
      self.done.put(name)
550
      self.sl.release()
551

    
552
    # Get exclusive lock while we fill the queue
553
    self.sl.acquire()
554

    
555
    shrcnt1 = 5
556
    shrcnt2 = 7
557
    shrcnt3 = 9
558
    shrcnt4 = 2
559

    
560
    # Add acquires using threading.Event for synchronization. They'll be
561
    # acquired exactly in the order defined in this list.
562
    acquires = (shrcnt1 * [(1, "shared 1")] +
563
                3 * [(0, "exclusive 1")] +
564
                shrcnt2 * [(1, "shared 2")] +
565
                shrcnt3 * [(1, "shared 3")] +
566
                shrcnt4 * [(1, "shared 4")] +
567
                3 * [(0, "exclusive 2")])
568

    
569
    ev_cur = None
570
    ev_prev = None
571

    
572
    for args in acquires:
573
      ev_cur = threading.Event()
574
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
575
      ev_prev = ev_cur
576

    
577
    # Wait for last acquire to start
578
    ev_prev.wait()
579

    
580
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
581
    # together
582
    self.assertEqual(self.sl._count_pending(), 7)
583

    
584
    # Release exclusive lock and wait
585
    self.sl.release()
586

    
587
    self._waitThreads()
588

    
589
    # Check sequence
590
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591
      # Shared locks aren't guaranteed to be notified in order, but they'll be
592
      # first
593
      tmp = self.done.get_nowait()
594
      if tmp == "shared 1":
595
        shrcnt1 -= 1
596
      elif tmp == "shared 2":
597
        shrcnt2 -= 1
598
      elif tmp == "shared 3":
599
        shrcnt3 -= 1
600
      elif tmp == "shared 4":
601
        shrcnt4 -= 1
602
    self.assertEqual(shrcnt1, 0)
603
    self.assertEqual(shrcnt2, 0)
604
    self.assertEqual(shrcnt3, 0)
605
    self.assertEqual(shrcnt3, 0)
606

    
607
    for _ in range(3):
608
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
609

    
610
    for _ in range(3):
611
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
612

    
613
    self.assertRaises(Queue.Empty, self.done.get_nowait)
614

    
615
  def testIllegalDowngrade(self):
616
    # Not yet acquired
617
    self.assertRaises(AssertionError, self.sl.downgrade)
618

    
619
    # Acquire in shared mode, downgrade should be no-op
620
    self.assertTrue(self.sl.acquire(shared=1))
621
    self.assertTrue(self.sl._is_owned(shared=1))
622
    self.assertTrue(self.sl.downgrade())
623
    self.assertTrue(self.sl._is_owned(shared=1))
624
    self.sl.release()
625

    
626
  def testDowngrade(self):
627
    self.assertTrue(self.sl.acquire())
628
    self.assertTrue(self.sl._is_owned(shared=0))
629
    self.assertTrue(self.sl.downgrade())
630
    self.assertTrue(self.sl._is_owned(shared=1))
631
    self.sl.release()
632

    
633
  @_Repeat
634
  def testDowngradeJumpsAheadOfExclusive(self):
635
    def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636
      self.assertTrue(self.sl.acquire())
637
      self.assertTrue(self.sl._is_owned(shared=0))
638
      ev_got.set()
639
      ev_downgrade.wait()
640
      self.assertTrue(self.sl._is_owned(shared=0))
641
      self.assertTrue(self.sl.downgrade())
642
      self.assertTrue(self.sl._is_owned(shared=1))
643
      ev_release.wait()
644
      self.assertTrue(self.sl._is_owned(shared=1))
645
      self.sl.release()
646

    
647
    def _KeepExclusive2(ev_started, ev_release):
648
      self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649
      self.assertTrue(self.sl._is_owned(shared=0))
650
      ev_release.wait()
651
      self.assertTrue(self.sl._is_owned(shared=0))
652
      self.sl.release()
653

    
654
    def _KeepShared(ev_started, ev_got, ev_release):
655
      self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656
      self.assertTrue(self.sl._is_owned(shared=1))
657
      ev_got.set()
658
      ev_release.wait()
659
      self.assertTrue(self.sl._is_owned(shared=1))
660
      self.sl.release()
661

    
662
    # Acquire lock in exclusive mode
663
    ev_got_excl1 = threading.Event()
664
    ev_downgrade_excl1 = threading.Event()
665
    ev_release_excl1 = threading.Event()
666
    th_excl1 = self._addThread(target=_KeepExclusive,
667
                               args=(ev_got_excl1, ev_downgrade_excl1,
668
                                     ev_release_excl1))
669
    ev_got_excl1.wait()
670

    
671
    # Start a second exclusive acquire
672
    ev_started_excl2 = threading.Event()
673
    ev_release_excl2 = threading.Event()
674
    th_excl2 = self._addThread(target=_KeepExclusive2,
675
                               args=(ev_started_excl2, ev_release_excl2))
676
    ev_started_excl2.wait()
677

    
678
    # Start shared acquires, will jump ahead of second exclusive acquire when
679
    # first exclusive acquire downgrades
680
    ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681
    ev_release_shared = threading.Event()
682

    
683
    th_shared = [self._addThread(target=_KeepShared,
684
                                 args=(ev_started, ev_got, ev_release_shared))
685
                 for (ev_started, ev_got) in ev_shared]
686

    
687
    # Wait for all shared acquires to start
688
    for (ev, _) in ev_shared:
689
      ev.wait()
690

    
691
    # Check lock information
692
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693
                     [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
694
    [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
695
    self.assertEqual([(pendmode, sorted(waiting))
696
                      for (pendmode, waiting) in pending],
697
                     [("exclusive", [th_excl2.getName()]),
698
                      ("shared", sorted(th.getName() for th in th_shared))])
699

    
700
    # Shared acquires won't start until the exclusive lock is downgraded
701
    ev_downgrade_excl1.set()
702

    
703
    # Wait for all shared acquires to be successful
704
    for (_, ev) in ev_shared:
705
      ev.wait()
706

    
707
    # Check lock information again
708
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
709
                                              query.LQ_PENDING])),
710
                     [(self.sl.name, "shared", None,
711
                       [("exclusive", [th_excl2.getName()])])])
712
    [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
713
    self.assertEqual(set(owner), set([th_excl1.getName()] +
714
                                     [th.getName() for th in th_shared]))
715

    
716
    ev_release_excl1.set()
717
    ev_release_excl2.set()
718
    ev_release_shared.set()
719

    
720
    self._waitThreads()
721

    
722
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
723
                                              query.LQ_PENDING])),
724
                     [(self.sl.name, None, None, [])])
725

    
726
  @_Repeat
727
  def testMixedAcquireTimeout(self):
728
    sync = threading.Event()
729

    
730
    def _AcquireShared(ev):
731
      if not self.sl.acquire(shared=1, timeout=None):
732
        return
733

    
734
      self.done.put("shared")
735

    
736
      # Notify main thread
737
      ev.set()
738

    
739
      # Wait for notification from main thread
740
      sync.wait()
741

    
742
      # Release lock
743
      self.sl.release()
744

    
745
    acquires = []
746
    for _ in range(3):
747
      ev = threading.Event()
748
      self._addThread(target=_AcquireShared, args=(ev, ))
749
      acquires.append(ev)
750

    
751
    # Wait for all acquires to finish
752
    for i in acquires:
753
      i.wait()
754

    
755
    self.assertEqual(self.sl._count_pending(), 0)
756

    
757
    # Try to get exclusive lock
758
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
759

    
760
    # Acquire exclusive without timeout
761
    exclsync = threading.Event()
762
    exclev = threading.Event()
763

    
764
    def _AcquireExclusive():
765
      if not self.sl.acquire(shared=0):
766
        return
767

    
768
      self.done.put("exclusive")
769

    
770
      # Notify main thread
771
      exclev.set()
772

    
773
      # Wait for notification from main thread
774
      exclsync.wait()
775

    
776
      self.sl.release()
777

    
778
    self._addThread(target=_AcquireExclusive)
779

    
780
    # Try to get exclusive lock
781
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
782

    
783
    # Make all shared holders release their locks
784
    sync.set()
785

    
786
    # Wait for exclusive acquire to succeed
787
    exclev.wait()
788

    
789
    self.assertEqual(self.sl._count_pending(), 0)
790

    
791
    # Try to get exclusive lock
792
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
793

    
794
    def _AcquireSharedSimple():
795
      if self.sl.acquire(shared=1, timeout=None):
796
        self.done.put("shared2")
797
        self.sl.release()
798

    
799
    for _ in range(10):
800
      self._addThread(target=_AcquireSharedSimple)
801

    
802
    # Tell exclusive lock to release
803
    exclsync.set()
804

    
805
    # Wait for everything to finish
806
    self._waitThreads()
807

    
808
    self.assertEqual(self.sl._count_pending(), 0)
809

    
810
    # Check sequence
811
    for _ in range(3):
812
      self.assertEqual(self.done.get_nowait(), "shared")
813

    
814
    self.assertEqual(self.done.get_nowait(), "exclusive")
815

    
816
    for _ in range(10):
817
      self.assertEqual(self.done.get_nowait(), "shared2")
818

    
819
    self.assertRaises(Queue.Empty, self.done.get_nowait)
820

    
821
  def testPriority(self):
822
    # Acquire in exclusive mode
823
    self.assert_(self.sl.acquire(shared=0))
824

    
825
    # Queue acquires
826
    def _Acquire(prev, next, shared, priority, result):
827
      prev.wait()
828
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
829
      try:
830
        self.done.put(result)
831
      finally:
832
        self.sl.release()
833

    
834
    counter = itertools.count(0)
835
    priorities = range(-20, 30)
836
    first = threading.Event()
837
    prev = first
838

    
839
    # Data structure:
840
    # {
841
    #   priority:
842
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
843
    #      (shared/exclusive, ...),
844
    #      ...,
845
    #     ],
846
    # }
847
    perprio = {}
848

    
849
    # References shared acquire per priority in L{perprio}. Data structure:
850
    # {
851
    #   priority: (shared=1, set(acquire names), set(pending threads)),
852
    # }
853
    prioshared = {}
854

    
855
    for seed in [4979, 9523, 14902, 32440]:
856
      # Use a deterministic random generator
857
      rnd = random.Random(seed)
858
      for priority in [rnd.choice(priorities) for _ in range(30)]:
859
        modes = [0, 1]
860
        rnd.shuffle(modes)
861
        for shared in modes:
862
          # Unique name
863
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
864

    
865
          ev = threading.Event()
866
          thread = self._addThread(target=_Acquire,
867
                                   args=(prev, ev, shared, priority, acqname))
868
          prev = ev
869

    
870
          # Record expected aqcuire, see above for structure
871
          data = (shared, set([acqname]), set([thread]))
872
          priolist = perprio.setdefault(priority, [])
873
          if shared:
874
            priosh = prioshared.get(priority, None)
875
            if priosh:
876
              # Shared acquires are merged
877
              for i, j in zip(priosh[1:], data[1:]):
878
                i.update(j)
879
              assert data[0] == priosh[0]
880
            else:
881
              prioshared[priority] = data
882
              priolist.append(data)
883
          else:
884
            priolist.append(data)
885

    
886
    # Start all acquires and wait for them
887
    first.set()
888
    prev.wait()
889

    
890
    # Check lock information
891
    self.assertEqual(self.sl.GetLockInfo(set()),
892
                     [(self.sl.name, None, None, None)])
893
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
894
                     [(self.sl.name, "exclusive",
895
                       [threading.currentThread().getName()], None)])
896

    
897
    self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
898
                            perprio)
899

    
900
    # Let threads acquire the lock
901
    self.sl.release()
902

    
903
    # Wait for everything to finish
904
    self._waitThreads()
905

    
906
    self.assert_(self.sl._check_empty())
907

    
908
    # Check acquires by priority
909
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
910
      for (_, names, _) in acquires:
911
        # For shared acquires, the set will contain 1..n entries. For exclusive
912
        # acquires only one.
913
        while names:
914
          names.remove(self.done.get_nowait())
915
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
916

    
917
    self.assertRaises(Queue.Empty, self.done.get_nowait)
918

    
919
  def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
920
    self.assertEqual(name, self.sl.name)
921
    self.assert_(mode is None)
922
    self.assert_(owner is None)
923

    
924
    self.assertEqual([(pendmode, sorted(waiting))
925
                      for (pendmode, waiting) in pending],
926
                     [(["exclusive", "shared"][int(bool(shared))],
927
                       sorted(t.getName() for t in threads))
928
                      for acquires in [perprio[i]
929
                                       for i in sorted(perprio.keys())]
930
                      for (shared, _, threads) in acquires])
931

    
932

    
933
class TestSharedLockInCondition(_ThreadedTestCase):
934
  """SharedLock as a condition lock tests"""
935

    
936
  def setUp(self):
937
    _ThreadedTestCase.setUp(self)
938
    self.sl = locking.SharedLock("TestSharedLockInCondition")
939
    self.setCondition()
940

    
941
  def setCondition(self):
942
    self.cond = threading.Condition(self.sl)
943

    
944
  def testKeepMode(self):
945
    self.cond.acquire(shared=1)
946
    self.assert_(self.sl._is_owned(shared=1))
947
    self.cond.wait(0)
948
    self.assert_(self.sl._is_owned(shared=1))
949
    self.cond.release()
950
    self.cond.acquire(shared=0)
951
    self.assert_(self.sl._is_owned(shared=0))
952
    self.cond.wait(0)
953
    self.assert_(self.sl._is_owned(shared=0))
954
    self.cond.release()
955

    
956

    
957
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
958
  """SharedLock as a pipe condition lock tests"""
959

    
960
  def setCondition(self):
961
    self.cond = locking.PipeCondition(self.sl)
962

    
963

    
964
class TestSSynchronizedDecorator(_ThreadedTestCase):
965
  """Shared Lock Synchronized decorator test"""
966

    
967
  def setUp(self):
968
    _ThreadedTestCase.setUp(self)
969

    
970
  @locking.ssynchronized(_decoratorlock)
971
  def _doItExclusive(self):
972
    self.assert_(_decoratorlock._is_owned())
973
    self.done.put('EXC')
974

    
975
  @locking.ssynchronized(_decoratorlock, shared=1)
976
  def _doItSharer(self):
977
    self.assert_(_decoratorlock._is_owned(shared=1))
978
    self.done.put('SHR')
979

    
980
  def testDecoratedFunctions(self):
981
    self._doItExclusive()
982
    self.assertFalse(_decoratorlock._is_owned())
983
    self._doItSharer()
984
    self.assertFalse(_decoratorlock._is_owned())
985

    
986
  def testSharersCanCoexist(self):
987
    _decoratorlock.acquire(shared=1)
988
    threading.Thread(target=self._doItSharer).start()
989
    self.assert_(self.done.get(True, 1))
990
    _decoratorlock.release()
991

    
992
  @_Repeat
993
  def testExclusiveBlocksExclusive(self):
994
    _decoratorlock.acquire()
995
    self._addThread(target=self._doItExclusive)
996
    # give it a bit of time to check that it's not actually doing anything
997
    self.assertRaises(Queue.Empty, self.done.get_nowait)
998
    _decoratorlock.release()
999
    self._waitThreads()
1000
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1001

    
1002
  @_Repeat
1003
  def testExclusiveBlocksSharer(self):
1004
    _decoratorlock.acquire()
1005
    self._addThread(target=self._doItSharer)
1006
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1007
    _decoratorlock.release()
1008
    self._waitThreads()
1009
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1010

    
1011
  @_Repeat
1012
  def testSharerBlocksExclusive(self):
1013
    _decoratorlock.acquire(shared=1)
1014
    self._addThread(target=self._doItExclusive)
1015
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1016
    _decoratorlock.release()
1017
    self._waitThreads()
1018
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1019

    
1020

    
1021
class TestLockSet(_ThreadedTestCase):
1022
  """LockSet tests"""
1023

    
1024
  def setUp(self):
1025
    _ThreadedTestCase.setUp(self)
1026
    self._setUpLS()
1027

    
1028
  def _setUpLS(self):
1029
    """Helper to (re)initialize the lock set"""
1030
    self.resources = ['one', 'two', 'three']
1031
    self.ls = locking.LockSet(self.resources, "TestLockSet")
1032

    
1033
  def testResources(self):
1034
    self.assertEquals(self.ls._names(), set(self.resources))
1035
    newls = locking.LockSet([], "TestLockSet.testResources")
1036
    self.assertEquals(newls._names(), set())
1037

    
1038
  def testAcquireRelease(self):
1039
    self.assert_(self.ls.acquire('one'))
1040
    self.assertEquals(self.ls._list_owned(), set(['one']))
1041
    self.ls.release()
1042
    self.assertEquals(self.ls._list_owned(), set())
1043
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
1044
    self.assertEquals(self.ls._list_owned(), set(['one']))
1045
    self.ls.release()
1046
    self.assertEquals(self.ls._list_owned(), set())
1047
    self.ls.acquire(['one', 'two', 'three'])
1048
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1049
    self.ls.release('one')
1050
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
1051
    self.ls.release(['three'])
1052
    self.assertEquals(self.ls._list_owned(), set(['two']))
1053
    self.ls.release()
1054
    self.assertEquals(self.ls._list_owned(), set())
1055
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1056
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
1057
    self.ls.release()
1058
    self.assertEquals(self.ls._list_owned(), set())
1059

    
1060
  def testNoDoubleAcquire(self):
1061
    self.ls.acquire('one')
1062
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
1063
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1064
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1065
    self.ls.release()
1066
    self.ls.acquire(['one', 'three'])
1067
    self.ls.release('one')
1068
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1069
    self.ls.release('three')
1070

    
1071
  def testNoWrongRelease(self):
1072
    self.assertRaises(AssertionError, self.ls.release)
1073
    self.ls.acquire('one')
1074
    self.assertRaises(AssertionError, self.ls.release, 'two')
1075

    
1076
  def testAddRemove(self):
1077
    self.ls.add('four')
1078
    self.assertEquals(self.ls._list_owned(), set())
1079
    self.assert_('four' in self.ls._names())
1080
    self.ls.add(['five', 'six', 'seven'], acquired=1)
1081
    self.assert_('five' in self.ls._names())
1082
    self.assert_('six' in self.ls._names())
1083
    self.assert_('seven' in self.ls._names())
1084
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
1085
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1086
    self.assert_('five' not in self.ls._names())
1087
    self.assert_('six' not in self.ls._names())
1088
    self.assertEquals(self.ls._list_owned(), set(['seven']))
1089
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1090
    self.ls.remove('seven')
1091
    self.assert_('seven' not in self.ls._names())
1092
    self.assertEquals(self.ls._list_owned(), set([]))
1093
    self.ls.acquire(None, shared=1)
1094
    self.assertRaises(AssertionError, self.ls.add, 'eight')
1095
    self.ls.release()
1096
    self.ls.acquire(None)
1097
    self.ls.add('eight', acquired=1)
1098
    self.assert_('eight' in self.ls._names())
1099
    self.assert_('eight' in self.ls._list_owned())
1100
    self.ls.add('nine')
1101
    self.assert_('nine' in self.ls._names())
1102
    self.assert_('nine' not in self.ls._list_owned())
1103
    self.ls.release()
1104
    self.ls.remove(['two'])
1105
    self.assert_('two' not in self.ls._names())
1106
    self.ls.acquire('three')
1107
    self.assertEquals(self.ls.remove(['three']), ['three'])
1108
    self.assert_('three' not in self.ls._names())
1109
    self.assertEquals(self.ls.remove('three'), [])
1110
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1111
    self.assert_('one' not in self.ls._names())
1112

    
1113
  def testRemoveNonBlocking(self):
1114
    self.ls.acquire('one')
1115
    self.assertEquals(self.ls.remove('one'), ['one'])
1116
    self.ls.acquire(['two', 'three'])
1117
    self.assertEquals(self.ls.remove(['two', 'three']),
1118
                      ['two', 'three'])
1119

    
1120
  def testNoDoubleAdd(self):
1121
    self.assertRaises(errors.LockError, self.ls.add, 'two')
1122
    self.ls.add('four')
1123
    self.assertRaises(errors.LockError, self.ls.add, 'four')
1124

    
1125
  def testNoWrongRemoves(self):
1126
    self.ls.acquire(['one', 'three'], shared=1)
1127
    # Cannot remove 'two' while holding something which is not a superset
1128
    self.assertRaises(AssertionError, self.ls.remove, 'two')
1129
    # Cannot remove 'three' as we are sharing it
1130
    self.assertRaises(AssertionError, self.ls.remove, 'three')
1131

    
1132
  def testAcquireSetLock(self):
1133
    # acquire the set-lock exclusively
1134
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1135
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1136
    self.assertEquals(self.ls._is_owned(), True)
1137
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1138
    # I can still add/remove elements...
1139
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1140
    self.assert_(self.ls.add('six'))
1141
    self.ls.release()
1142
    # share the set-lock
1143
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1144
    # adding new elements is not possible
1145
    self.assertRaises(AssertionError, self.ls.add, 'five')
1146
    self.ls.release()
1147

    
1148
  def testAcquireWithRepetitions(self):
1149
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1150
                      set(['two', 'two', 'three']))
1151
    self.ls.release(['two', 'two'])
1152
    self.assertEquals(self.ls._list_owned(), set(['three']))
1153

    
1154
  def testEmptyAcquire(self):
1155
    # Acquire an empty list of locks...
1156
    self.assertEquals(self.ls.acquire([]), set())
1157
    self.assertEquals(self.ls._list_owned(), set())
1158
    # New locks can still be addded
1159
    self.assert_(self.ls.add('six'))
1160
    # "re-acquiring" is not an issue, since we had really acquired nothing
1161
    self.assertEquals(self.ls.acquire([], shared=1), set())
1162
    self.assertEquals(self.ls._list_owned(), set())
1163
    # We haven't really acquired anything, so we cannot release
1164
    self.assertRaises(AssertionError, self.ls.release)
1165

    
1166
  def _doLockSet(self, names, shared):
1167
    try:
1168
      self.ls.acquire(names, shared=shared)
1169
      self.done.put('DONE')
1170
      self.ls.release()
1171
    except errors.LockError:
1172
      self.done.put('ERR')
1173

    
1174
  def _doAddSet(self, names):
1175
    try:
1176
      self.ls.add(names, acquired=1)
1177
      self.done.put('DONE')
1178
      self.ls.release()
1179
    except errors.LockError:
1180
      self.done.put('ERR')
1181

    
1182
  def _doRemoveSet(self, names):
1183
    self.done.put(self.ls.remove(names))
1184

    
1185
  @_Repeat
1186
  def testConcurrentSharedAcquire(self):
1187
    self.ls.acquire(['one', 'two'], shared=1)
1188
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1189
    self._waitThreads()
1190
    self.assertEqual(self.done.get_nowait(), 'DONE')
1191
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1192
    self._waitThreads()
1193
    self.assertEqual(self.done.get_nowait(), 'DONE')
1194
    self._addThread(target=self._doLockSet, args=('three', 1))
1195
    self._waitThreads()
1196
    self.assertEqual(self.done.get_nowait(), 'DONE')
1197
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1198
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1199
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1200
    self.ls.release()
1201
    self._waitThreads()
1202
    self.assertEqual(self.done.get_nowait(), 'DONE')
1203
    self.assertEqual(self.done.get_nowait(), 'DONE')
1204

    
1205
  @_Repeat
1206
  def testConcurrentExclusiveAcquire(self):
1207
    self.ls.acquire(['one', 'two'])
1208
    self._addThread(target=self._doLockSet, args=('three', 1))
1209
    self._waitThreads()
1210
    self.assertEqual(self.done.get_nowait(), 'DONE')
1211
    self._addThread(target=self._doLockSet, args=('three', 0))
1212
    self._waitThreads()
1213
    self.assertEqual(self.done.get_nowait(), 'DONE')
1214
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1215
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1216
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1217
    self._addThread(target=self._doLockSet, args=('one', 0))
1218
    self._addThread(target=self._doLockSet, args=('one', 1))
1219
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1220
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1221
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1222
    self.ls.release()
1223
    self._waitThreads()
1224
    for _ in range(6):
1225
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1226

    
1227
  @_Repeat
1228
  def testSimpleAcquireTimeoutExpiring(self):
1229
    names = sorted(self.ls._names())
1230
    self.assert_(len(names) >= 3)
1231

    
1232
    # Get name of first lock
1233
    first = names[0]
1234

    
1235
    # Get name of last lock
1236
    last = names.pop()
1237

    
1238
    checks = [
1239
      # Block first and try to lock it again
1240
      (first, first),
1241

    
1242
      # Block last and try to lock all locks
1243
      (None, first),
1244

    
1245
      # Block last and try to lock it again
1246
      (last, last),
1247
      ]
1248

    
1249
    for (wanted, block) in checks:
1250
      # Lock in exclusive mode
1251
      self.assert_(self.ls.acquire(block, shared=0))
1252

    
1253
      def _AcquireOne():
1254
        # Try to get the same lock again with a timeout (should never succeed)
1255
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1256
        if acquired:
1257
          self.done.put("acquired")
1258
          self.ls.release()
1259
        else:
1260
          self.assert_(acquired is None)
1261
          self.assertFalse(self.ls._list_owned())
1262
          self.assertFalse(self.ls._is_owned())
1263
          self.done.put("not acquired")
1264

    
1265
      self._addThread(target=_AcquireOne)
1266

    
1267
      # Wait for timeout in thread to expire
1268
      self._waitThreads()
1269

    
1270
      # Release exclusive lock again
1271
      self.ls.release()
1272

    
1273
      self.assertEqual(self.done.get_nowait(), "not acquired")
1274
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1275

    
1276
  @_Repeat
1277
  def testDelayedAndExpiringLockAcquire(self):
1278
    self._setUpLS()
1279
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1280

    
1281
    for expire in (False, True):
1282
      names = sorted(self.ls._names())
1283
      self.assertEqual(len(names), 8)
1284

    
1285
      lock_ev = dict([(i, threading.Event()) for i in names])
1286

    
1287
      # Lock all in exclusive mode
1288
      self.assert_(self.ls.acquire(names, shared=0))
1289

    
1290
      if expire:
1291
        # We'll wait at least 300ms per lock
1292
        lockwait = len(names) * [0.3]
1293

    
1294
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1295
        # this gives us up to 2.4s to fail.
1296
        lockall_timeout = 0.4
1297
      else:
1298
        # This should finish rather quickly
1299
        lockwait = None
1300
        lockall_timeout = len(names) * 5.0
1301

    
1302
      def _LockAll():
1303
        def acquire_notification(name):
1304
          if not expire:
1305
            self.done.put("getting %s" % name)
1306

    
1307
          # Kick next lock
1308
          lock_ev[name].set()
1309

    
1310
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1311
                           test_notify=acquire_notification):
1312
          self.done.put("got all")
1313
          self.ls.release()
1314
        else:
1315
          self.done.put("timeout on all")
1316

    
1317
        # Notify all locks
1318
        for ev in lock_ev.values():
1319
          ev.set()
1320

    
1321
      t = self._addThread(target=_LockAll)
1322

    
1323
      for idx, name in enumerate(names):
1324
        # Wait for actual acquire on this lock to start
1325
        lock_ev[name].wait(10.0)
1326

    
1327
        if expire and t.isAlive():
1328
          # Wait some time after getting the notification to make sure the lock
1329
          # acquire will expire
1330
          SafeSleep(lockwait[idx])
1331

    
1332
        self.ls.release(names=name)
1333

    
1334
      self.assertFalse(self.ls._list_owned())
1335

    
1336
      self._waitThreads()
1337

    
1338
      if expire:
1339
        # Not checking which locks were actually acquired. Doing so would be
1340
        # too timing-dependant.
1341
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1342
      else:
1343
        for i in names:
1344
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1345
        self.assertEqual(self.done.get_nowait(), "got all")
1346
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1347

    
1348
  @_Repeat
1349
  def testConcurrentRemove(self):
1350
    self.ls.add('four')
1351
    self.ls.acquire(['one', 'two', 'four'])
1352
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1353
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1354
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1355
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1356
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1357
    self.ls.remove('one')
1358
    self.ls.release()
1359
    self._waitThreads()
1360
    for i in range(4):
1361
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1362
    self.ls.add(['five', 'six'], acquired=1)
1363
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1364
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1365
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1366
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1367
    self.ls.remove('five')
1368
    self.ls.release()
1369
    self._waitThreads()
1370
    for i in range(4):
1371
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1372
    self.ls.acquire(['three', 'four'])
1373
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1374
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1375
    self.ls.remove('four')
1376
    self._waitThreads()
1377
    self.assertEqual(self.done.get_nowait(), ['six'])
1378
    self._addThread(target=self._doRemoveSet, args=(['two']))
1379
    self._waitThreads()
1380
    self.assertEqual(self.done.get_nowait(), ['two'])
1381
    self.ls.release()
1382
    # reset lockset
1383
    self._setUpLS()
1384

    
1385
  @_Repeat
1386
  def testConcurrentSharedSetLock(self):
1387
    # share the set-lock...
1388
    self.ls.acquire(None, shared=1)
1389
    # ...another thread can share it too
1390
    self._addThread(target=self._doLockSet, args=(None, 1))
1391
    self._waitThreads()
1392
    self.assertEqual(self.done.get_nowait(), 'DONE')
1393
    # ...or just share some elements
1394
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1395
    self._waitThreads()
1396
    self.assertEqual(self.done.get_nowait(), 'DONE')
1397
    # ...but not add new ones or remove any
1398
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1399
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1400
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1401
    # this just releases the set-lock
1402
    self.ls.release([])
1403
    t.join(60)
1404
    self.assertEqual(self.done.get_nowait(), 'DONE')
1405
    # release the lock on the actual elements so remove() can proceed too
1406
    self.ls.release()
1407
    self._waitThreads()
1408
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1409
    # reset lockset
1410
    self._setUpLS()
1411

    
1412
  @_Repeat
1413
  def testConcurrentExclusiveSetLock(self):
1414
    # acquire the set-lock...
1415
    self.ls.acquire(None, shared=0)
1416
    # ...no one can do anything else
1417
    self._addThread(target=self._doLockSet, args=(None, 1))
1418
    self._addThread(target=self._doLockSet, args=(None, 0))
1419
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1420
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1421
    self._addThread(target=self._doAddSet, args=(['nine']))
1422
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1423
    self.ls.release()
1424
    self._waitThreads()
1425
    for _ in range(5):
1426
      self.assertEqual(self.done.get(True, 1), 'DONE')
1427
    # cleanup
1428
    self._setUpLS()
1429

    
1430
  @_Repeat
1431
  def testConcurrentSetLockAdd(self):
1432
    self.ls.acquire('one')
1433
    # Another thread wants the whole SetLock
1434
    self._addThread(target=self._doLockSet, args=(None, 0))
1435
    self._addThread(target=self._doLockSet, args=(None, 1))
1436
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1437
    self.assertRaises(AssertionError, self.ls.add, 'four')
1438
    self.ls.release()
1439
    self._waitThreads()
1440
    self.assertEqual(self.done.get_nowait(), 'DONE')
1441
    self.assertEqual(self.done.get_nowait(), 'DONE')
1442
    self.ls.acquire(None)
1443
    self._addThread(target=self._doLockSet, args=(None, 0))
1444
    self._addThread(target=self._doLockSet, args=(None, 1))
1445
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1446
    self.ls.add('four')
1447
    self.ls.add('five', acquired=1)
1448
    self.ls.add('six', acquired=1, shared=1)
1449
    self.assertEquals(self.ls._list_owned(),
1450
      set(['one', 'two', 'three', 'five', 'six']))
1451
    self.assertEquals(self.ls._is_owned(), True)
1452
    self.assertEquals(self.ls._names(),
1453
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1454
    self.ls.release()
1455
    self._waitThreads()
1456
    self.assertEqual(self.done.get_nowait(), 'DONE')
1457
    self.assertEqual(self.done.get_nowait(), 'DONE')
1458
    self._setUpLS()
1459

    
1460
  @_Repeat
1461
  def testEmptyLockSet(self):
1462
    # get the set-lock
1463
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1464
    # now empty it...
1465
    self.ls.remove(['one', 'two', 'three'])
1466
    # and adds/locks by another thread still wait
1467
    self._addThread(target=self._doAddSet, args=(['nine']))
1468
    self._addThread(target=self._doLockSet, args=(None, 1))
1469
    self._addThread(target=self._doLockSet, args=(None, 0))
1470
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1471
    self.ls.release()
1472
    self._waitThreads()
1473
    for _ in range(3):
1474
      self.assertEqual(self.done.get_nowait(), 'DONE')
1475
    # empty it again...
1476
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1477
    # now share it...
1478
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1479
    # other sharers can go, adds still wait
1480
    self._addThread(target=self._doLockSet, args=(None, 1))
1481
    self._waitThreads()
1482
    self.assertEqual(self.done.get_nowait(), 'DONE')
1483
    self._addThread(target=self._doAddSet, args=(['nine']))
1484
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1485
    self.ls.release()
1486
    self._waitThreads()
1487
    self.assertEqual(self.done.get_nowait(), 'DONE')
1488
    self._setUpLS()
1489

    
1490
  def testAcquireWithNamesDowngrade(self):
1491
    self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1492
    self.assertTrue(self.ls._is_owned())
1493
    self.assertFalse(self.ls._get_lock()._is_owned())
1494
    self.ls.release()
1495
    self.assertFalse(self.ls._is_owned())
1496
    self.assertFalse(self.ls._get_lock()._is_owned())
1497
    # Can't downgrade after releasing
1498
    self.assertRaises(AssertionError, self.ls.downgrade, "two")
1499

    
1500
  def testDowngrade(self):
1501
    # Not owning anything, must raise an exception
1502
    self.assertFalse(self.ls._is_owned())
1503
    self.assertRaises(AssertionError, self.ls.downgrade)
1504

    
1505
    self.assertFalse(compat.any(i._is_owned()
1506
                                for i in self.ls._get_lockdict().values()))
1507

    
1508
    self.assertEquals(self.ls.acquire(None, shared=0),
1509
                      set(["one", "two", "three"]))
1510
    self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1511

    
1512
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1513
    self.assertTrue(compat.all(i._is_owned(shared=0)
1514
                               for i in self.ls._get_lockdict().values()))
1515

    
1516
    # Start downgrading locks
1517
    self.assertTrue(self.ls.downgrade(names=["one"]))
1518
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1519
    self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")])
1520
                               for name, lock in
1521
                                 self.ls._get_lockdict().items()))
1522

    
1523
    self.assertTrue(self.ls.downgrade(names="two"))
1524
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1525
    should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1526
    self.assertTrue(compat.all(lock._is_owned(shared=should_share(name))
1527
                               for name, lock in
1528
                                 self.ls._get_lockdict().items()))
1529

    
1530
    # Downgrading the last exclusive lock to shared must downgrade the
1531
    # lockset-internal lock too
1532
    self.assertTrue(self.ls.downgrade(names="three"))
1533
    self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1534
    self.assertTrue(compat.all(i._is_owned(shared=1)
1535
                               for i in self.ls._get_lockdict().values()))
1536

    
1537
    # Downgrading a shared lock must be a no-op
1538
    self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1539
    self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1540
    self.assertTrue(compat.all(i._is_owned(shared=1)
1541
                               for i in self.ls._get_lockdict().values()))
1542

    
1543
    self.ls.release()
1544

    
1545
  def testPriority(self):
1546
    def _Acquire(prev, next, name, priority, success_fn):
1547
      prev.wait()
1548
      self.assert_(self.ls.acquire(name, shared=0,
1549
                                   priority=priority,
1550
                                   test_notify=lambda _: next.set()))
1551
      try:
1552
        success_fn()
1553
      finally:
1554
        self.ls.release()
1555

    
1556
    # Get all in exclusive mode
1557
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1558

    
1559
    done_two = Queue.Queue(0)
1560

    
1561
    first = threading.Event()
1562
    prev = first
1563

    
1564
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1565
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1566

    
1567
    # Use a deterministic random generator
1568
    random.Random(741).shuffle(acquires)
1569

    
1570
    for (name, prio, done) in acquires:
1571
      ev = threading.Event()
1572
      self._addThread(target=_Acquire,
1573
                      args=(prev, ev, name, prio,
1574
                            compat.partial(done.put, "Prio%s" % prio)))
1575
      prev = ev
1576

    
1577
    # Start acquires
1578
    first.set()
1579

    
1580
    # Wait for last acquire to start
1581
    prev.wait()
1582

    
1583
    # Let threads acquire locks
1584
    self.ls.release()
1585

    
1586
    # Wait for threads to finish
1587
    self._waitThreads()
1588

    
1589
    for i in range(1, 33):
1590
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1591
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1592

    
1593
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1594
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1595

    
1596

    
1597
class TestGanetiLockManager(_ThreadedTestCase):
1598

    
1599
  def setUp(self):
1600
    _ThreadedTestCase.setUp(self)
1601
    self.nodes=['n1', 'n2']
1602
    self.nodegroups=['g1', 'g2']
1603
    self.instances=['i1', 'i2', 'i3']
1604
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1605
                                        self.instances)
1606

    
1607
  def tearDown(self):
1608
    # Don't try this at home...
1609
    locking.GanetiLockManager._instance = None
1610

    
1611
  def testLockingConstants(self):
1612
    # The locking library internally cheats by assuming its constants have some
1613
    # relationships with each other. Check those hold true.
1614
    # This relationship is also used in the Processor to recursively acquire
1615
    # the right locks. Again, please don't break it.
1616
    for i in range(len(locking.LEVELS)):
1617
      self.assertEqual(i, locking.LEVELS[i])
1618

    
1619
  def testDoubleGLFails(self):
1620
    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1621

    
1622
  def testLockNames(self):
1623
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1624
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1625
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1626
                     set(self.nodegroups))
1627
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1628
                     set(self.instances))
1629

    
1630
  def testInitAndResources(self):
1631
    locking.GanetiLockManager._instance = None
1632
    self.GL = locking.GanetiLockManager([], [], [])
1633
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1634
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1635
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1636
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1637

    
1638
    locking.GanetiLockManager._instance = None
1639
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1640
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1641
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1642
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1643
                                    set(self.nodegroups))
1644
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1645

    
1646
    locking.GanetiLockManager._instance = None
1647
    self.GL = locking.GanetiLockManager([], [], self.instances)
1648
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1649
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1650
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1651
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1652
                     set(self.instances))
1653

    
1654
  def testAcquireRelease(self):
1655
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1656
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1657
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1658
    self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1659
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1660
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1661
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1662
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1663
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1664
    self.GL.release(locking.LEVEL_NODE)
1665
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1666
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1667
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1668
    self.GL.release(locking.LEVEL_NODEGROUP)
1669
    self.GL.release(locking.LEVEL_INSTANCE)
1670
    self.assertRaises(errors.LockError, self.GL.acquire,
1671
                      locking.LEVEL_INSTANCE, ['i5'])
1672
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1673
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1674

    
1675
  def testAcquireWholeSets(self):
1676
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1677
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1678
                      set(self.instances))
1679
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1680
                      set(self.instances))
1681
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1682
                      set(self.nodegroups))
1683
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1684
                      set(self.nodegroups))
1685
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1686
                      set(self.nodes))
1687
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1688
                      set(self.nodes))
1689
    self.GL.release(locking.LEVEL_NODE)
1690
    self.GL.release(locking.LEVEL_NODEGROUP)
1691
    self.GL.release(locking.LEVEL_INSTANCE)
1692
    self.GL.release(locking.LEVEL_CLUSTER)
1693

    
1694
  def testAcquireWholeAndPartial(self):
1695
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1696
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1697
                      set(self.instances))
1698
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1699
                      set(self.instances))
1700
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1701
                      set(['n2']))
1702
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1703
                      set(['n2']))
1704
    self.GL.release(locking.LEVEL_NODE)
1705
    self.GL.release(locking.LEVEL_INSTANCE)
1706
    self.GL.release(locking.LEVEL_CLUSTER)
1707

    
1708
  def testBGLDependency(self):
1709
    self.assertRaises(AssertionError, self.GL.acquire,
1710
                      locking.LEVEL_NODE, ['n1', 'n2'])
1711
    self.assertRaises(AssertionError, self.GL.acquire,
1712
                      locking.LEVEL_INSTANCE, ['i3'])
1713
    self.assertRaises(AssertionError, self.GL.acquire,
1714
                      locking.LEVEL_NODEGROUP, ['g1'])
1715
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1716
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1717
    self.assertRaises(AssertionError, self.GL.release,
1718
                      locking.LEVEL_CLUSTER, ['BGL'])
1719
    self.assertRaises(AssertionError, self.GL.release,
1720
                      locking.LEVEL_CLUSTER)
1721
    self.GL.release(locking.LEVEL_NODE)
1722
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1723
    self.assertRaises(AssertionError, self.GL.release,
1724
                      locking.LEVEL_CLUSTER, ['BGL'])
1725
    self.assertRaises(AssertionError, self.GL.release,
1726
                      locking.LEVEL_CLUSTER)
1727
    self.GL.release(locking.LEVEL_INSTANCE)
1728
    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1729
    self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1730
    self.assertRaises(AssertionError, self.GL.release,
1731
                      locking.LEVEL_CLUSTER, ['BGL'])
1732
    self.assertRaises(AssertionError, self.GL.release,
1733
                      locking.LEVEL_CLUSTER)
1734
    self.GL.release(locking.LEVEL_NODEGROUP)
1735
    self.GL.release(locking.LEVEL_CLUSTER)
1736

    
1737
  def testWrongOrder(self):
1738
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1739
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1740
    self.assertRaises(AssertionError, self.GL.acquire,
1741
                      locking.LEVEL_NODE, ['n1'])
1742
    self.assertRaises(AssertionError, self.GL.acquire,
1743
                      locking.LEVEL_NODEGROUP, ['g1'])
1744
    self.assertRaises(AssertionError, self.GL.acquire,
1745
                      locking.LEVEL_INSTANCE, ['i2'])
1746

    
1747
  def testModifiableLevels(self):
1748
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1749
                      ['BGL2'])
1750
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1751
    self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1752
    self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1753
    self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1754
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1755
    self.GL.add(locking.LEVEL_NODE, ['n3'])
1756
    self.GL.remove(locking.LEVEL_NODE, ['n1'])
1757
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1758
    self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1759
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1760
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1761
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1762
    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1763
                      ['BGL2'])
1764

    
1765
  # Helper function to run as a thread that shared the BGL and then acquires
1766
  # some locks at another level.
1767
  def _doLock(self, level, names, shared):
1768
    try:
1769
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1770
      self.GL.acquire(level, names, shared=shared)
1771
      self.done.put('DONE')
1772
      self.GL.release(level)
1773
      self.GL.release(locking.LEVEL_CLUSTER)
1774
    except errors.LockError:
1775
      self.done.put('ERR')
1776

    
1777
  @_Repeat
1778
  def testConcurrency(self):
1779
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1780
    self._addThread(target=self._doLock,
1781
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1782
    self._waitThreads()
1783
    self.assertEqual(self.done.get_nowait(), 'DONE')
1784
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1785
    self._addThread(target=self._doLock,
1786
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1787
    self._waitThreads()
1788
    self.assertEqual(self.done.get_nowait(), 'DONE')
1789
    self._addThread(target=self._doLock,
1790
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1791
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1792
    self.GL.release(locking.LEVEL_INSTANCE)
1793
    self._waitThreads()
1794
    self.assertEqual(self.done.get_nowait(), 'DONE')
1795
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1796
    self._addThread(target=self._doLock,
1797
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1798
    self._waitThreads()
1799
    self.assertEqual(self.done.get_nowait(), 'DONE')
1800
    self._addThread(target=self._doLock,
1801
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1802
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1803
    self.GL.release(locking.LEVEL_INSTANCE)
1804
    self._waitThreads()
1805
    self.assertEqual(self.done.get(True, 1), 'DONE')
1806
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1807

    
1808

    
1809
class TestLockMonitor(_ThreadedTestCase):
1810
  def setUp(self):
1811
    _ThreadedTestCase.setUp(self)
1812
    self.lm = locking.LockMonitor()
1813

    
1814
  def testSingleThread(self):
1815
    locks = []
1816

    
1817
    for i in range(100):
1818
      name = "TestLock%s" % i
1819
      locks.append(locking.SharedLock(name, monitor=self.lm))
1820

    
1821
    self.assertEqual(len(self.lm._locks), len(locks))
1822
    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1823
    self.assertEqual(len(result.fields), 1)
1824
    self.assertEqual(len(result.data), 100)
1825

    
1826
    # Delete all locks
1827
    del locks[:]
1828

    
1829
    # The garbage collector might needs some time
1830
    def _CheckLocks():
1831
      if self.lm._locks:
1832
        raise utils.RetryAgain()
1833

    
1834
    utils.Retry(_CheckLocks, 0.1, 30.0)
1835

    
1836
    self.assertFalse(self.lm._locks)
1837

    
1838
  def testMultiThread(self):
1839
    locks = []
1840

    
1841
    def _CreateLock(prev, next, name):
1842
      prev.wait()
1843
      locks.append(locking.SharedLock(name, monitor=self.lm))
1844
      if next:
1845
        next.set()
1846

    
1847
    expnames = []
1848

    
1849
    first = threading.Event()
1850
    prev = first
1851

    
1852
    # Use a deterministic random generator
1853
    for i in random.Random(4263).sample(range(100), 33):
1854
      name = "MtTestLock%s" % i
1855
      expnames.append(name)
1856

    
1857
      ev = threading.Event()
1858
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1859
      prev = ev
1860

    
1861
    # Add locks
1862
    first.set()
1863
    self._waitThreads()
1864

    
1865
    # Check order in which locks were added
1866
    self.assertEqual([i.name for i in locks], expnames)
1867

    
1868
    # Check query result
1869
    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1870
    self.assert_(isinstance(result, dict))
1871
    response = objects.QueryResponse.FromDict(result)
1872
    self.assertEqual(response.data,
1873
                     [[(constants.RS_NORMAL, name),
1874
                       (constants.RS_NORMAL, None),
1875
                       (constants.RS_NORMAL, None),
1876
                       (constants.RS_NORMAL, [])]
1877
                      for name in utils.NiceSort(expnames)])
1878
    self.assertEqual(len(response.fields), 4)
1879
    self.assertEqual(["name", "mode", "owner", "pending"],
1880
                     [fdef.name for fdef in response.fields])
1881

    
1882
    # Test exclusive acquire
1883
    for tlock in locks[::4]:
1884
      tlock.acquire(shared=0)
1885
      try:
1886
        def _GetExpResult(name):
1887
          if tlock.name == name:
1888
            return [(constants.RS_NORMAL, name),
1889
                    (constants.RS_NORMAL, "exclusive"),
1890
                    (constants.RS_NORMAL,
1891
                     [threading.currentThread().getName()]),
1892
                    (constants.RS_NORMAL, [])]
1893
          return [(constants.RS_NORMAL, name),
1894
                  (constants.RS_NORMAL, None),
1895
                  (constants.RS_NORMAL, None),
1896
                  (constants.RS_NORMAL, [])]
1897

    
1898
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1899
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1900
                         [_GetExpResult(name)
1901
                          for name in utils.NiceSort(expnames)])
1902
      finally:
1903
        tlock.release()
1904

    
1905
    # Test shared acquire
1906
    def _Acquire(lock, shared, ev, notify):
1907
      lock.acquire(shared=shared)
1908
      try:
1909
        notify.set()
1910
        ev.wait()
1911
      finally:
1912
        lock.release()
1913

    
1914
    for tlock1 in locks[::11]:
1915
      for tlock2 in locks[::-15]:
1916
        if tlock2 == tlock1:
1917
          # Avoid deadlocks
1918
          continue
1919

    
1920
        for tlock3 in locks[::10]:
1921
          if tlock3 in (tlock2, tlock1):
1922
            # Avoid deadlocks
1923
            continue
1924

    
1925
          releaseev = threading.Event()
1926

    
1927
          # Acquire locks
1928
          acquireev = []
1929
          tthreads1 = []
1930
          for i in range(3):
1931
            ev = threading.Event()
1932
            tthreads1.append(self._addThread(target=_Acquire,
1933
                                             args=(tlock1, 1, releaseev, ev)))
1934
            acquireev.append(ev)
1935

    
1936
          ev = threading.Event()
1937
          tthread2 = self._addThread(target=_Acquire,
1938
                                     args=(tlock2, 1, releaseev, ev))
1939
          acquireev.append(ev)
1940

    
1941
          ev = threading.Event()
1942
          tthread3 = self._addThread(target=_Acquire,
1943
                                     args=(tlock3, 0, releaseev, ev))
1944
          acquireev.append(ev)
1945

    
1946
          # Wait for all locks to be acquired
1947
          for i in acquireev:
1948
            i.wait()
1949

    
1950
          # Check query result
1951
          result = self.lm.QueryLocks(["name", "mode", "owner"])
1952
          response = objects.QueryResponse.FromDict(result)
1953
          for (name, mode, owner) in response.data:
1954
            (name_status, name_value) = name
1955
            (owner_status, owner_value) = owner
1956

    
1957
            self.assertEqual(name_status, constants.RS_NORMAL)
1958
            self.assertEqual(owner_status, constants.RS_NORMAL)
1959

    
1960
            if name_value == tlock1.name:
1961
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1962
              self.assertEqual(set(owner_value),
1963
                               set(i.getName() for i in tthreads1))
1964
              continue
1965

    
1966
            if name_value == tlock2.name:
1967
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1968
              self.assertEqual(owner_value, [tthread2.getName()])
1969
              continue
1970

    
1971
            if name_value == tlock3.name:
1972
              self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1973
              self.assertEqual(owner_value, [tthread3.getName()])
1974
              continue
1975

    
1976
            self.assert_(name_value in expnames)
1977
            self.assertEqual(mode, (constants.RS_NORMAL, None))
1978
            self.assert_(owner_value is None)
1979

    
1980
          # Release locks again
1981
          releaseev.set()
1982

    
1983
          self._waitThreads()
1984

    
1985
          result = self.lm.QueryLocks(["name", "mode", "owner"])
1986
          self.assertEqual(objects.QueryResponse.FromDict(result).data,
1987
                           [[(constants.RS_NORMAL, name),
1988
                             (constants.RS_NORMAL, None),
1989
                             (constants.RS_NORMAL, None)]
1990
                            for name in utils.NiceSort(expnames)])
1991

    
1992
  def testDelete(self):
1993
    lock = locking.SharedLock("TestLock", monitor=self.lm)
1994

    
1995
    self.assertEqual(len(self.lm._locks), 1)
1996
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1997
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
1998
                     [[(constants.RS_NORMAL, lock.name),
1999
                       (constants.RS_NORMAL, None),
2000
                       (constants.RS_NORMAL, None)]])
2001

    
2002
    lock.delete()
2003

    
2004
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2005
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2006
                     [[(constants.RS_NORMAL, lock.name),
2007
                       (constants.RS_NORMAL, "deleted"),
2008
                       (constants.RS_NORMAL, None)]])
2009
    self.assertEqual(len(self.lm._locks), 1)
2010

    
2011
  def testPending(self):
2012
    def _Acquire(lock, shared, prev, next):
2013
      prev.wait()
2014

    
2015
      lock.acquire(shared=shared, test_notify=next.set)
2016
      try:
2017
        pass
2018
      finally:
2019
        lock.release()
2020

    
2021
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
2022

    
2023
    for shared in [0, 1]:
2024
      lock.acquire()
2025
      try:
2026
        self.assertEqual(len(self.lm._locks), 1)
2027
        result = self.lm.QueryLocks(["name", "mode", "owner"])
2028
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2029
                         [[(constants.RS_NORMAL, lock.name),
2030
                           (constants.RS_NORMAL, "exclusive"),
2031
                           (constants.RS_NORMAL,
2032
                            [threading.currentThread().getName()])]])
2033

    
2034
        threads = []
2035

    
2036
        first = threading.Event()
2037
        prev = first
2038

    
2039
        for i in range(5):
2040
          ev = threading.Event()
2041
          threads.append(self._addThread(target=_Acquire,
2042
                                          args=(lock, shared, prev, ev)))
2043
          prev = ev
2044

    
2045
        # Start acquires
2046
        first.set()
2047

    
2048
        # Wait for last acquire to start waiting
2049
        prev.wait()
2050

    
2051
        # NOTE: This works only because QueryLocks will acquire the
2052
        # lock-internal lock again and won't be able to get the information
2053
        # until it has the lock. By then the acquire should be registered in
2054
        # SharedLock.__pending (otherwise it's a bug).
2055

    
2056
        # All acquires are waiting now
2057
        if shared:
2058
          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2059
        else:
2060
          pending = [("exclusive", [t.getName()]) for t in threads]
2061

    
2062
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2063
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2064
                         [[(constants.RS_NORMAL, lock.name),
2065
                           (constants.RS_NORMAL, "exclusive"),
2066
                           (constants.RS_NORMAL,
2067
                            [threading.currentThread().getName()]),
2068
                           (constants.RS_NORMAL, pending)]])
2069

    
2070
        self.assertEqual(len(self.lm._locks), 1)
2071
      finally:
2072
        lock.release()
2073

    
2074
      self._waitThreads()
2075

    
2076
      # No pending acquires
2077
      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2078
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2079
                       [[(constants.RS_NORMAL, lock.name),
2080
                         (constants.RS_NORMAL, None),
2081
                         (constants.RS_NORMAL, None),
2082
                         (constants.RS_NORMAL, [])]])
2083

    
2084
      self.assertEqual(len(self.lm._locks), 1)
2085

    
2086
  def testDeleteAndRecreate(self):
2087
    lname = "TestLock101923193"
2088

    
2089
    # Create some locks with the same name and keep all references
2090
    locks = [locking.SharedLock(lname, monitor=self.lm)
2091
             for _ in range(5)]
2092

    
2093
    self.assertEqual(len(self.lm._locks), len(locks))
2094

    
2095
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2096
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2097
                     [[(constants.RS_NORMAL, lname),
2098
                       (constants.RS_NORMAL, None),
2099
                       (constants.RS_NORMAL, None)]] * 5)
2100

    
2101
    locks[2].delete()
2102

    
2103
    # Check information order
2104
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2105
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2106
                     [[(constants.RS_NORMAL, lname),
2107
                       (constants.RS_NORMAL, None),
2108
                       (constants.RS_NORMAL, None)]] * 2 +
2109
                     [[(constants.RS_NORMAL, lname),
2110
                       (constants.RS_NORMAL, "deleted"),
2111
                       (constants.RS_NORMAL, None)]] +
2112
                     [[(constants.RS_NORMAL, lname),
2113
                       (constants.RS_NORMAL, None),
2114
                       (constants.RS_NORMAL, None)]] * 2)
2115

    
2116
    locks[1].acquire(shared=0)
2117

    
2118
    last_status = [
2119
      [(constants.RS_NORMAL, lname),
2120
       (constants.RS_NORMAL, None),
2121
       (constants.RS_NORMAL, None)],
2122
      [(constants.RS_NORMAL, lname),
2123
       (constants.RS_NORMAL, "exclusive"),
2124
       (constants.RS_NORMAL, [threading.currentThread().getName()])],
2125
      [(constants.RS_NORMAL, lname),
2126
       (constants.RS_NORMAL, "deleted"),
2127
       (constants.RS_NORMAL, None)],
2128
      [(constants.RS_NORMAL, lname),
2129
       (constants.RS_NORMAL, None),
2130
       (constants.RS_NORMAL, None)],
2131
      [(constants.RS_NORMAL, lname),
2132
       (constants.RS_NORMAL, None),
2133
       (constants.RS_NORMAL, None)],
2134
      ]
2135

    
2136
    # Check information order
2137
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2138
    self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2139

    
2140
    self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2141
    self.assertEqual(len(self.lm._locks), len(locks))
2142

    
2143
    # Check lock deletion
2144
    for idx in range(len(locks)):
2145
      del locks[0]
2146
      assert gc.isenabled()
2147
      gc.collect()
2148
      self.assertEqual(len(self.lm._locks), len(locks))
2149
      result = self.lm.QueryLocks(["name", "mode", "owner"])
2150
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2151
                       last_status[idx + 1:])
2152

    
2153
    # All locks should have been deleted
2154
    assert not locks
2155
    self.assertFalse(self.lm._locks)
2156

    
2157
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2158
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2159

    
2160
  class _FakeLock:
2161
    def __init__(self):
2162
      self._info = []
2163

    
2164
    def AddResult(self, *args):
2165
      self._info.append(args)
2166

    
2167
    def CountPending(self):
2168
      return len(self._info)
2169

    
2170
    def GetLockInfo(self, requested):
2171
      (exp_requested, result) = self._info.pop(0)
2172

    
2173
      if exp_requested != requested:
2174
        raise Exception("Requested information (%s) does not match"
2175
                        " expectations (%s)" % (requested, exp_requested))
2176

    
2177
      return result
2178

    
2179
  def testMultipleResults(self):
2180
    fl1 = self._FakeLock()
2181
    fl2 = self._FakeLock()
2182

    
2183
    self.lm.RegisterLock(fl1)
2184
    self.lm.RegisterLock(fl2)
2185

    
2186
    # Empty information
2187
    for i in [fl1, fl2]:
2188
      i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2189
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2190
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2191
    for i in [fl1, fl2]:
2192
      self.assertEqual(i.CountPending(), 0)
2193

    
2194
    # Check ordering
2195
    for fn in [lambda x: x, reversed, sorted]:
2196
      fl1.AddResult(set(), list(fn([
2197
        ("aaa", None, None, None),
2198
        ("bbb", None, None, None),
2199
        ])))
2200
      fl2.AddResult(set(), [])
2201
      result = self.lm.QueryLocks(["name"])
2202
      self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2203
        [(constants.RS_NORMAL, "aaa")],
2204
        [(constants.RS_NORMAL, "bbb")],
2205
        ])
2206
      for i in [fl1, fl2]:
2207
        self.assertEqual(i.CountPending(), 0)
2208

    
2209
      for fn2 in [lambda x: x, reversed, sorted]:
2210
        fl1.AddResult(set([query.LQ_MODE]), list(fn([
2211
          # Same name, but different information
2212
          ("aaa", "mode0", None, None),
2213
          ("aaa", "mode1", None, None),
2214
          ("aaa", "mode2", None, None),
2215
          ("aaa", "mode3", None, None),
2216
          ])))
2217
        fl2.AddResult(set([query.LQ_MODE]), [
2218
          ("zzz", "end", None, None),
2219
          ("000", "start", None, None),
2220
          ] + list(fn2([
2221
          ("aaa", "b200", None, None),
2222
          ("aaa", "b300", None, None),
2223
          ])))
2224
        result = self.lm.QueryLocks(["name", "mode"])
2225
        self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2226
          [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2227
          ] + list(fn([
2228
          # Name is the same, so order must be equal to incoming order
2229
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2230
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2231
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2232
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2233
          ])) + list(fn2([
2234
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2235
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2236
          ])) + [
2237
          [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2238
          ])
2239
        for i in [fl1, fl2]:
2240
          self.assertEqual(i.CountPending(), 0)
2241

    
2242

    
2243
if __name__ == '__main__':
2244
  testutils.GanetiTestProgram()