Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 75d81fc8

History | View | Annotate | Download (73.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import gc
32
import itertools
33

    
34
from ganeti import constants
35
from ganeti import locking
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import compat
39
from ganeti import objects
40
from ganeti import query
41

    
42
import testutils
43

    
44

    
45
# This is used to test the ssynchronize decorator.
46
# Since it's passed as input to a decorator it must be declared as a global.
47
_decoratorlock = locking.SharedLock("decorator lock")
48

    
49
#: List for looping tests
50
ITERATIONS = range(8)
51

    
52

    
53
def _Repeat(fn):
54
  """Decorator for executing a function many times"""
55
  def wrapper(*args, **kwargs):
56
    for i in ITERATIONS:
57
      fn(*args, **kwargs)
58
  return wrapper
59

    
60

    
61
def SafeSleep(duration):
62
  start = time.time()
63
  while True:
64
    delay = start + duration - time.time()
65
    if delay <= 0.0:
66
      break
67
    time.sleep(delay)
68

    
69

    
70
class _ThreadedTestCase(unittest.TestCase):
71
  """Test class that supports adding/waiting on threads"""
72
  def setUp(self):
73
    unittest.TestCase.setUp(self)
74
    self.done = Queue.Queue(0)
75
    self.threads = []
76

    
77
  def _addThread(self, *args, **kwargs):
78
    """Create and remember a new thread"""
79
    t = threading.Thread(*args, **kwargs)
80
    self.threads.append(t)
81
    t.start()
82
    return t
83

    
84
  def _waitThreads(self):
85
    """Wait for all our threads to finish"""
86
    for t in self.threads:
87
      t.join(60)
88
      self.failIf(t.isAlive())
89
    self.threads = []
90

    
91

    
92
class _ConditionTestCase(_ThreadedTestCase):
93
  """Common test case for conditions"""
94

    
95
  def setUp(self, cls):
96
    _ThreadedTestCase.setUp(self)
97
    self.lock = threading.Lock()
98
    self.cond = cls(self.lock)
99

    
100
  def _testAcquireRelease(self):
101
    self.assertFalse(self.cond._is_owned())
102
    self.assertRaises(RuntimeError, self.cond.wait, None)
103
    self.assertRaises(RuntimeError, self.cond.notifyAll)
104

    
105
    self.cond.acquire()
106
    self.assert_(self.cond._is_owned())
107
    self.cond.notifyAll()
108
    self.assert_(self.cond._is_owned())
109
    self.cond.release()
110

    
111
    self.assertFalse(self.cond._is_owned())
112
    self.assertRaises(RuntimeError, self.cond.wait, None)
113
    self.assertRaises(RuntimeError, self.cond.notifyAll)
114

    
115
  def _testNotification(self):
116
    def _NotifyAll():
117
      self.done.put("NE")
118
      self.cond.acquire()
119
      self.done.put("NA")
120
      self.cond.notifyAll()
121
      self.done.put("NN")
122
      self.cond.release()
123

    
124
    self.cond.acquire()
125
    self._addThread(target=_NotifyAll)
126
    self.assertEqual(self.done.get(True, 1), "NE")
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.wait(None)
129
    self.assertEqual(self.done.get(True, 1), "NA")
130
    self.assertEqual(self.done.get(True, 1), "NN")
131
    self.assert_(self.cond._is_owned())
132
    self.cond.release()
133
    self.assertFalse(self.cond._is_owned())
134

    
135

    
136
class TestSingleNotifyPipeCondition(_ConditionTestCase):
137
  """SingleNotifyPipeCondition tests"""
138

    
139
  def setUp(self):
140
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141

    
142
  def testAcquireRelease(self):
143
    self._testAcquireRelease()
144

    
145
  def testNotification(self):
146
    self._testNotification()
147

    
148
  def testWaitReuse(self):
149
    self.cond.acquire()
150
    self.cond.wait(0)
151
    self.cond.wait(0.1)
152
    self.cond.release()
153

    
154
  def testNoNotifyReuse(self):
155
    self.cond.acquire()
156
    self.cond.notifyAll()
157
    self.assertRaises(RuntimeError, self.cond.wait, None)
158
    self.assertRaises(RuntimeError, self.cond.notifyAll)
159
    self.cond.release()
160

    
161

    
162
class TestPipeCondition(_ConditionTestCase):
163
  """PipeCondition tests"""
164

    
165
  def setUp(self):
166
    _ConditionTestCase.setUp(self, locking.PipeCondition)
167

    
168
  def testAcquireRelease(self):
169
    self._testAcquireRelease()
170

    
171
  def testNotification(self):
172
    self._testNotification()
173

    
174
  def _TestWait(self, fn):
175
    threads = [
176
      self._addThread(target=fn),
177
      self._addThread(target=fn),
178
      self._addThread(target=fn),
179
      ]
180

    
181
    # Wait for threads to be waiting
182
    for _ in threads:
183
      self.assertEqual(self.done.get(True, 1), "A")
184

    
185
    self.assertRaises(Queue.Empty, self.done.get_nowait)
186

    
187
    self.cond.acquire()
188
    self.assertEqual(len(self.cond._waiters), 3)
189
    self.assertEqual(self.cond._waiters, set(threads))
190
    # This new thread can't acquire the lock, and thus call wait, before we
191
    # release it
192
    self._addThread(target=fn)
193
    self.cond.notifyAll()
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195
    self.cond.release()
196

    
197
    # We should now get 3 W and 1 A (for the new thread) in whatever order
198
    w = 0
199
    a = 0
200
    for i in range(4):
201
      got = self.done.get(True, 1)
202
      if got == "W":
203
        w += 1
204
      elif got == "A":
205
        a += 1
206
      else:
207
        self.fail("Got %s on the done queue" % got)
208

    
209
    self.assertEqual(w, 3)
210
    self.assertEqual(a, 1)
211

    
212
    self.cond.acquire()
213
    self.cond.notifyAll()
214
    self.cond.release()
215
    self._waitThreads()
216
    self.assertEqual(self.done.get_nowait(), "W")
217
    self.assertRaises(Queue.Empty, self.done.get_nowait)
218

    
219
  def testBlockingWait(self):
220
    def _BlockingWait():
221
      self.cond.acquire()
222
      self.done.put("A")
223
      self.cond.wait(None)
224
      self.cond.release()
225
      self.done.put("W")
226

    
227
    self._TestWait(_BlockingWait)
228

    
229
  def testLongTimeoutWait(self):
230
    def _Helper():
231
      self.cond.acquire()
232
      self.done.put("A")
233
      self.cond.wait(15.0)
234
      self.cond.release()
235
      self.done.put("W")
236

    
237
    self._TestWait(_Helper)
238

    
239
  def _TimeoutWait(self, timeout, check):
240
    self.cond.acquire()
241
    self.cond.wait(timeout)
242
    self.cond.release()
243
    self.done.put(check)
244

    
245
  def testShortTimeoutWait(self):
246
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248
    self._waitThreads()
249
    self.assertEqual(self.done.get_nowait(), "T1")
250
    self.assertEqual(self.done.get_nowait(), "T1")
251
    self.assertRaises(Queue.Empty, self.done.get_nowait)
252

    
253
  def testZeroTimeoutWait(self):
254
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257
    self._waitThreads()
258
    self.assertEqual(self.done.get_nowait(), "T0")
259
    self.assertEqual(self.done.get_nowait(), "T0")
260
    self.assertEqual(self.done.get_nowait(), "T0")
261
    self.assertRaises(Queue.Empty, self.done.get_nowait)
262

    
263

    
264
class TestSharedLock(_ThreadedTestCase):
265
  """SharedLock tests"""
266

    
267
  def setUp(self):
268
    _ThreadedTestCase.setUp(self)
269
    self.sl = locking.SharedLock("TestSharedLock")
270

    
271
  def testSequenceAndOwnership(self):
272
    self.assertFalse(self.sl._is_owned())
273
    self.sl.acquire(shared=1)
274
    self.assert_(self.sl._is_owned())
275
    self.assert_(self.sl._is_owned(shared=1))
276
    self.assertFalse(self.sl._is_owned(shared=0))
277
    self.sl.release()
278
    self.assertFalse(self.sl._is_owned())
279
    self.sl.acquire()
280
    self.assert_(self.sl._is_owned())
281
    self.assertFalse(self.sl._is_owned(shared=1))
282
    self.assert_(self.sl._is_owned(shared=0))
283
    self.sl.release()
284
    self.assertFalse(self.sl._is_owned())
285
    self.sl.acquire(shared=1)
286
    self.assert_(self.sl._is_owned())
287
    self.assert_(self.sl._is_owned(shared=1))
288
    self.assertFalse(self.sl._is_owned(shared=0))
289
    self.sl.release()
290
    self.assertFalse(self.sl._is_owned())
291

    
292
  def testBooleanValue(self):
293
    # semaphores are supposed to return a true value on a successful acquire
294
    self.assert_(self.sl.acquire(shared=1))
295
    self.sl.release()
296
    self.assert_(self.sl.acquire())
297
    self.sl.release()
298

    
299
  def testDoubleLockingStoE(self):
300
    self.sl.acquire(shared=1)
301
    self.assertRaises(AssertionError, self.sl.acquire)
302

    
303
  def testDoubleLockingEtoS(self):
304
    self.sl.acquire()
305
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306

    
307
  def testDoubleLockingStoS(self):
308
    self.sl.acquire(shared=1)
309
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310

    
311
  def testDoubleLockingEtoE(self):
312
    self.sl.acquire()
313
    self.assertRaises(AssertionError, self.sl.acquire)
314

    
315
  # helper functions: called in a separate thread they acquire the lock, send
316
  # their identifier on the done queue, then release it.
317
  def _doItSharer(self):
318
    try:
319
      self.sl.acquire(shared=1)
320
      self.done.put('SHR')
321
      self.sl.release()
322
    except errors.LockError:
323
      self.done.put('ERR')
324

    
325
  def _doItExclusive(self):
326
    try:
327
      self.sl.acquire()
328
      self.done.put('EXC')
329
      self.sl.release()
330
    except errors.LockError:
331
      self.done.put('ERR')
332

    
333
  def _doItDelete(self):
334
    try:
335
      self.sl.delete()
336
      self.done.put('DEL')
337
    except errors.LockError:
338
      self.done.put('ERR')
339

    
340
  def testSharersCanCoexist(self):
341
    self.sl.acquire(shared=1)
342
    threading.Thread(target=self._doItSharer).start()
343
    self.assert_(self.done.get(True, 1))
344
    self.sl.release()
345

    
346
  @_Repeat
347
  def testExclusiveBlocksExclusive(self):
348
    self.sl.acquire()
349
    self._addThread(target=self._doItExclusive)
350
    self.assertRaises(Queue.Empty, self.done.get_nowait)
351
    self.sl.release()
352
    self._waitThreads()
353
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354

    
355
  @_Repeat
356
  def testExclusiveBlocksDelete(self):
357
    self.sl.acquire()
358
    self._addThread(target=self._doItDelete)
359
    self.assertRaises(Queue.Empty, self.done.get_nowait)
360
    self.sl.release()
361
    self._waitThreads()
362
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363
    self.sl = locking.SharedLock(self.sl.name)
364

    
365
  @_Repeat
366
  def testExclusiveBlocksSharer(self):
367
    self.sl.acquire()
368
    self._addThread(target=self._doItSharer)
369
    self.assertRaises(Queue.Empty, self.done.get_nowait)
370
    self.sl.release()
371
    self._waitThreads()
372
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373

    
374
  @_Repeat
375
  def testSharerBlocksExclusive(self):
376
    self.sl.acquire(shared=1)
377
    self._addThread(target=self._doItExclusive)
378
    self.assertRaises(Queue.Empty, self.done.get_nowait)
379
    self.sl.release()
380
    self._waitThreads()
381
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382

    
383
  @_Repeat
384
  def testSharerBlocksDelete(self):
385
    self.sl.acquire(shared=1)
386
    self._addThread(target=self._doItDelete)
387
    self.assertRaises(Queue.Empty, self.done.get_nowait)
388
    self.sl.release()
389
    self._waitThreads()
390
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391
    self.sl = locking.SharedLock(self.sl.name)
392

    
393
  @_Repeat
394
  def testWaitingExclusiveBlocksSharer(self):
395
    """SKIPPED testWaitingExclusiveBlockSharer"""
396
    return
397

    
398
    self.sl.acquire(shared=1)
399
    # the lock is acquired in shared mode...
400
    self._addThread(target=self._doItExclusive)
401
    # ...but now an exclusive is waiting...
402
    self._addThread(target=self._doItSharer)
403
    # ...so the sharer should be blocked as well
404
    self.assertRaises(Queue.Empty, self.done.get_nowait)
405
    self.sl.release()
406
    self._waitThreads()
407
    # The exclusive passed before
408
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410

    
411
  @_Repeat
412
  def testWaitingSharerBlocksExclusive(self):
413
    """SKIPPED testWaitingSharerBlocksExclusive"""
414
    return
415

    
416
    self.sl.acquire()
417
    # the lock is acquired in exclusive mode...
418
    self._addThread(target=self._doItSharer)
419
    # ...but now a sharer is waiting...
420
    self._addThread(target=self._doItExclusive)
421
    # ...the exclusive is waiting too...
422
    self.assertRaises(Queue.Empty, self.done.get_nowait)
423
    self.sl.release()
424
    self._waitThreads()
425
    # The sharer passed before
426
    self.assertEqual(self.done.get_nowait(), 'SHR')
427
    self.assertEqual(self.done.get_nowait(), 'EXC')
428

    
429
  def testDelete(self):
430
    self.sl.delete()
431
    self.assertRaises(errors.LockError, self.sl.acquire)
432
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433
    self.assertRaises(errors.LockError, self.sl.delete)
434

    
435
  def testDeleteTimeout(self):
436
    self.sl.delete(timeout=60)
437

    
438
  def testNoDeleteIfSharer(self):
439
    self.sl.acquire(shared=1)
440
    self.assertRaises(AssertionError, self.sl.delete)
441

    
442
  @_Repeat
443
  def testDeletePendingSharersExclusiveDelete(self):
444
    self.sl.acquire()
445
    self._addThread(target=self._doItSharer)
446
    self._addThread(target=self._doItSharer)
447
    self._addThread(target=self._doItExclusive)
448
    self._addThread(target=self._doItDelete)
449
    self.sl.delete()
450
    self._waitThreads()
451
    # The threads who were pending return ERR
452
    for _ in range(4):
453
      self.assertEqual(self.done.get_nowait(), 'ERR')
454
    self.sl = locking.SharedLock(self.sl.name)
455

    
456
  @_Repeat
457
  def testDeletePendingDeleteExclusiveSharers(self):
458
    self.sl.acquire()
459
    self._addThread(target=self._doItDelete)
460
    self._addThread(target=self._doItExclusive)
461
    self._addThread(target=self._doItSharer)
462
    self._addThread(target=self._doItSharer)
463
    self.sl.delete()
464
    self._waitThreads()
465
    # The two threads who were pending return both ERR
466
    self.assertEqual(self.done.get_nowait(), 'ERR')
467
    self.assertEqual(self.done.get_nowait(), 'ERR')
468
    self.assertEqual(self.done.get_nowait(), 'ERR')
469
    self.assertEqual(self.done.get_nowait(), 'ERR')
470
    self.sl = locking.SharedLock(self.sl.name)
471

    
472
  @_Repeat
473
  def testExclusiveAcquireTimeout(self):
474
    for shared in [0, 1]:
475
      on_queue = threading.Event()
476
      release_exclusive = threading.Event()
477

    
478
      def _LockExclusive():
479
        self.sl.acquire(shared=0, test_notify=on_queue.set)
480
        self.done.put("A: start wait")
481
        release_exclusive.wait()
482
        self.done.put("A: end wait")
483
        self.sl.release()
484

    
485
      # Start thread to hold lock in exclusive mode
486
      self._addThread(target=_LockExclusive)
487

    
488
      # Wait for wait to begin
489
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
490

    
491
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
492
      # on the queue
493
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494
                                      test_notify=release_exclusive.set))
495

    
496
      self.done.put("got 2nd")
497
      self.sl.release()
498

    
499
      self._waitThreads()
500

    
501
      self.assertEqual(self.done.get_nowait(), "A: end wait")
502
      self.assertEqual(self.done.get_nowait(), "got 2nd")
503
      self.assertRaises(Queue.Empty, self.done.get_nowait)
504

    
505
  @_Repeat
506
  def testAcquireExpiringTimeout(self):
507
    def _AcquireWithTimeout(shared, timeout):
508
      if not self.sl.acquire(shared=shared, timeout=timeout):
509
        self.done.put("timeout")
510

    
511
    for shared in [0, 1]:
512
      # Lock exclusively
513
      self.sl.acquire()
514

    
515
      # Start shared acquires with timeout between 0 and 20 ms
516
      for i in range(11):
517
        self._addThread(target=_AcquireWithTimeout,
518
                        args=(shared, i * 2.0 / 1000.0))
519

    
520
      # Wait for threads to finish (makes sure the acquire timeout expires
521
      # before releasing the lock)
522
      self._waitThreads()
523

    
524
      # Release lock
525
      self.sl.release()
526

    
527
      for _ in range(11):
528
        self.assertEqual(self.done.get_nowait(), "timeout")
529

    
530
      self.assertRaises(Queue.Empty, self.done.get_nowait)
531

    
532
  @_Repeat
533
  def testSharedSkipExclusiveAcquires(self):
534
    # Tests whether shared acquires jump in front of exclusive acquires in the
535
    # queue.
536

    
537
    def _Acquire(shared, name, notify_ev, wait_ev):
538
      if notify_ev:
539
        notify_fn = notify_ev.set
540
      else:
541
        notify_fn = None
542

    
543
      if wait_ev:
544
        wait_ev.wait()
545

    
546
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
547
        return
548

    
549
      self.done.put(name)
550
      self.sl.release()
551

    
552
    # Get exclusive lock while we fill the queue
553
    self.sl.acquire()
554

    
555
    shrcnt1 = 5
556
    shrcnt2 = 7
557
    shrcnt3 = 9
558
    shrcnt4 = 2
559

    
560
    # Add acquires using threading.Event for synchronization. They'll be
561
    # acquired exactly in the order defined in this list.
562
    acquires = (shrcnt1 * [(1, "shared 1")] +
563
                3 * [(0, "exclusive 1")] +
564
                shrcnt2 * [(1, "shared 2")] +
565
                shrcnt3 * [(1, "shared 3")] +
566
                shrcnt4 * [(1, "shared 4")] +
567
                3 * [(0, "exclusive 2")])
568

    
569
    ev_cur = None
570
    ev_prev = None
571

    
572
    for args in acquires:
573
      ev_cur = threading.Event()
574
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
575
      ev_prev = ev_cur
576

    
577
    # Wait for last acquire to start
578
    ev_prev.wait()
579

    
580
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
581
    # together
582
    self.assertEqual(self.sl._count_pending(), 7)
583

    
584
    # Release exclusive lock and wait
585
    self.sl.release()
586

    
587
    self._waitThreads()
588

    
589
    # Check sequence
590
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591
      # Shared locks aren't guaranteed to be notified in order, but they'll be
592
      # first
593
      tmp = self.done.get_nowait()
594
      if tmp == "shared 1":
595
        shrcnt1 -= 1
596
      elif tmp == "shared 2":
597
        shrcnt2 -= 1
598
      elif tmp == "shared 3":
599
        shrcnt3 -= 1
600
      elif tmp == "shared 4":
601
        shrcnt4 -= 1
602
    self.assertEqual(shrcnt1, 0)
603
    self.assertEqual(shrcnt2, 0)
604
    self.assertEqual(shrcnt3, 0)
605
    self.assertEqual(shrcnt3, 0)
606

    
607
    for _ in range(3):
608
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
609

    
610
    for _ in range(3):
611
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
612

    
613
    self.assertRaises(Queue.Empty, self.done.get_nowait)
614

    
615
  def testIllegalDowngrade(self):
616
    # Not yet acquired
617
    self.assertRaises(AssertionError, self.sl.downgrade)
618

    
619
    # Acquire in shared mode, downgrade should be no-op
620
    self.assertTrue(self.sl.acquire(shared=1))
621
    self.assertTrue(self.sl._is_owned(shared=1))
622
    self.assertTrue(self.sl.downgrade())
623
    self.assertTrue(self.sl._is_owned(shared=1))
624
    self.sl.release()
625

    
626
  def testDowngrade(self):
627
    self.assertTrue(self.sl.acquire())
628
    self.assertTrue(self.sl._is_owned(shared=0))
629
    self.assertTrue(self.sl.downgrade())
630
    self.assertTrue(self.sl._is_owned(shared=1))
631
    self.sl.release()
632

    
633
  @_Repeat
634
  def testDowngradeJumpsAheadOfExclusive(self):
635
    def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636
      self.assertTrue(self.sl.acquire())
637
      self.assertTrue(self.sl._is_owned(shared=0))
638
      ev_got.set()
639
      ev_downgrade.wait()
640
      self.assertTrue(self.sl._is_owned(shared=0))
641
      self.assertTrue(self.sl.downgrade())
642
      self.assertTrue(self.sl._is_owned(shared=1))
643
      ev_release.wait()
644
      self.assertTrue(self.sl._is_owned(shared=1))
645
      self.sl.release()
646

    
647
    def _KeepExclusive2(ev_started, ev_release):
648
      self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649
      self.assertTrue(self.sl._is_owned(shared=0))
650
      ev_release.wait()
651
      self.assertTrue(self.sl._is_owned(shared=0))
652
      self.sl.release()
653

    
654
    def _KeepShared(ev_started, ev_got, ev_release):
655
      self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656
      self.assertTrue(self.sl._is_owned(shared=1))
657
      ev_got.set()
658
      ev_release.wait()
659
      self.assertTrue(self.sl._is_owned(shared=1))
660
      self.sl.release()
661

    
662
    # Acquire lock in exclusive mode
663
    ev_got_excl1 = threading.Event()
664
    ev_downgrade_excl1 = threading.Event()
665
    ev_release_excl1 = threading.Event()
666
    th_excl1 = self._addThread(target=_KeepExclusive,
667
                               args=(ev_got_excl1, ev_downgrade_excl1,
668
                                     ev_release_excl1))
669
    ev_got_excl1.wait()
670

    
671
    # Start a second exclusive acquire
672
    ev_started_excl2 = threading.Event()
673
    ev_release_excl2 = threading.Event()
674
    th_excl2 = self._addThread(target=_KeepExclusive2,
675
                               args=(ev_started_excl2, ev_release_excl2))
676
    ev_started_excl2.wait()
677

    
678
    # Start shared acquires, will jump ahead of second exclusive acquire when
679
    # first exclusive acquire downgrades
680
    ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681
    ev_release_shared = threading.Event()
682

    
683
    th_shared = [self._addThread(target=_KeepShared,
684
                                 args=(ev_started, ev_got, ev_release_shared))
685
                 for (ev_started, ev_got) in ev_shared]
686

    
687
    # Wait for all shared acquires to start
688
    for (ev, _) in ev_shared:
689
      ev.wait()
690

    
691
    # Check lock information
692
    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693
                     (self.sl.name, "exclusive", [th_excl1.getName()], None))
694
    (_, _, _, pending) = self.sl.GetInfo(set([query.LQ_PENDING]))
695
    self.assertEqual([(pendmode, sorted(waiting))
696
                      for (pendmode, waiting) in pending],
697
                     [("exclusive", [th_excl2.getName()]),
698
                      ("shared", sorted(th.getName() for th in th_shared))])
699

    
700
    # Shared acquires won't start until the exclusive lock is downgraded
701
    ev_downgrade_excl1.set()
702

    
703
    # Wait for all shared acquires to be successful
704
    for (_, ev) in ev_shared:
705
      ev.wait()
706

    
707
    # Check lock information again
708
    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_PENDING])),
709
                     (self.sl.name, "shared", None,
710
                      [("exclusive", [th_excl2.getName()])]))
711
    (_, _, owner, _) = self.sl.GetInfo(set([query.LQ_OWNER]))
712
    self.assertEqual(set(owner), set([th_excl1.getName()] +
713
                                     [th.getName() for th in th_shared]))
714

    
715
    ev_release_excl1.set()
716
    ev_release_excl2.set()
717
    ev_release_shared.set()
718

    
719
    self._waitThreads()
720

    
721
    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER,
722
                                          query.LQ_PENDING])),
723
                     (self.sl.name, None, None, []))
724

    
725
  @_Repeat
726
  def testMixedAcquireTimeout(self):
727
    sync = threading.Event()
728

    
729
    def _AcquireShared(ev):
730
      if not self.sl.acquire(shared=1, timeout=None):
731
        return
732

    
733
      self.done.put("shared")
734

    
735
      # Notify main thread
736
      ev.set()
737

    
738
      # Wait for notification from main thread
739
      sync.wait()
740

    
741
      # Release lock
742
      self.sl.release()
743

    
744
    acquires = []
745
    for _ in range(3):
746
      ev = threading.Event()
747
      self._addThread(target=_AcquireShared, args=(ev, ))
748
      acquires.append(ev)
749

    
750
    # Wait for all acquires to finish
751
    for i in acquires:
752
      i.wait()
753

    
754
    self.assertEqual(self.sl._count_pending(), 0)
755

    
756
    # Try to get exclusive lock
757
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
758

    
759
    # Acquire exclusive without timeout
760
    exclsync = threading.Event()
761
    exclev = threading.Event()
762

    
763
    def _AcquireExclusive():
764
      if not self.sl.acquire(shared=0):
765
        return
766

    
767
      self.done.put("exclusive")
768

    
769
      # Notify main thread
770
      exclev.set()
771

    
772
      # Wait for notification from main thread
773
      exclsync.wait()
774

    
775
      self.sl.release()
776

    
777
    self._addThread(target=_AcquireExclusive)
778

    
779
    # Try to get exclusive lock
780
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
781

    
782
    # Make all shared holders release their locks
783
    sync.set()
784

    
785
    # Wait for exclusive acquire to succeed
786
    exclev.wait()
787

    
788
    self.assertEqual(self.sl._count_pending(), 0)
789

    
790
    # Try to get exclusive lock
791
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
792

    
793
    def _AcquireSharedSimple():
794
      if self.sl.acquire(shared=1, timeout=None):
795
        self.done.put("shared2")
796
        self.sl.release()
797

    
798
    for _ in range(10):
799
      self._addThread(target=_AcquireSharedSimple)
800

    
801
    # Tell exclusive lock to release
802
    exclsync.set()
803

    
804
    # Wait for everything to finish
805
    self._waitThreads()
806

    
807
    self.assertEqual(self.sl._count_pending(), 0)
808

    
809
    # Check sequence
810
    for _ in range(3):
811
      self.assertEqual(self.done.get_nowait(), "shared")
812

    
813
    self.assertEqual(self.done.get_nowait(), "exclusive")
814

    
815
    for _ in range(10):
816
      self.assertEqual(self.done.get_nowait(), "shared2")
817

    
818
    self.assertRaises(Queue.Empty, self.done.get_nowait)
819

    
820
  def testPriority(self):
821
    # Acquire in exclusive mode
822
    self.assert_(self.sl.acquire(shared=0))
823

    
824
    # Queue acquires
825
    def _Acquire(prev, next, shared, priority, result):
826
      prev.wait()
827
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
828
      try:
829
        self.done.put(result)
830
      finally:
831
        self.sl.release()
832

    
833
    counter = itertools.count(0)
834
    priorities = range(-20, 30)
835
    first = threading.Event()
836
    prev = first
837

    
838
    # Data structure:
839
    # {
840
    #   priority:
841
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
842
    #      (shared/exclusive, ...),
843
    #      ...,
844
    #     ],
845
    # }
846
    perprio = {}
847

    
848
    # References shared acquire per priority in L{perprio}. Data structure:
849
    # {
850
    #   priority: (shared=1, set(acquire names), set(pending threads)),
851
    # }
852
    prioshared = {}
853

    
854
    for seed in [4979, 9523, 14902, 32440]:
855
      # Use a deterministic random generator
856
      rnd = random.Random(seed)
857
      for priority in [rnd.choice(priorities) for _ in range(30)]:
858
        modes = [0, 1]
859
        rnd.shuffle(modes)
860
        for shared in modes:
861
          # Unique name
862
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
863

    
864
          ev = threading.Event()
865
          thread = self._addThread(target=_Acquire,
866
                                   args=(prev, ev, shared, priority, acqname))
867
          prev = ev
868

    
869
          # Record expected aqcuire, see above for structure
870
          data = (shared, set([acqname]), set([thread]))
871
          priolist = perprio.setdefault(priority, [])
872
          if shared:
873
            priosh = prioshared.get(priority, None)
874
            if priosh:
875
              # Shared acquires are merged
876
              for i, j in zip(priosh[1:], data[1:]):
877
                i.update(j)
878
              assert data[0] == priosh[0]
879
            else:
880
              prioshared[priority] = data
881
              priolist.append(data)
882
          else:
883
            priolist.append(data)
884

    
885
    # Start all acquires and wait for them
886
    first.set()
887
    prev.wait()
888

    
889
    # Check lock information
890
    self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
891
    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
892
                     (self.sl.name, "exclusive",
893
                      [threading.currentThread().getName()], None))
894

    
895
    self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
896

    
897
    # Let threads acquire the lock
898
    self.sl.release()
899

    
900
    # Wait for everything to finish
901
    self._waitThreads()
902

    
903
    self.assert_(self.sl._check_empty())
904

    
905
    # Check acquires by priority
906
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
907
      for (_, names, _) in acquires:
908
        # For shared acquires, the set will contain 1..n entries. For exclusive
909
        # acquires only one.
910
        while names:
911
          names.remove(self.done.get_nowait())
912
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
913

    
914
    self.assertRaises(Queue.Empty, self.done.get_nowait)
915

    
916
  def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
917
    self.assertEqual(name, self.sl.name)
918
    self.assert_(mode is None)
919
    self.assert_(owner is None)
920

    
921
    self.assertEqual([(pendmode, sorted(waiting))
922
                      for (pendmode, waiting) in pending],
923
                     [(["exclusive", "shared"][int(bool(shared))],
924
                       sorted(t.getName() for t in threads))
925
                      for acquires in [perprio[i]
926
                                       for i in sorted(perprio.keys())]
927
                      for (shared, _, threads) in acquires])
928

    
929

    
930
class TestSharedLockInCondition(_ThreadedTestCase):
931
  """SharedLock as a condition lock tests"""
932

    
933
  def setUp(self):
934
    _ThreadedTestCase.setUp(self)
935
    self.sl = locking.SharedLock("TestSharedLockInCondition")
936
    self.setCondition()
937

    
938
  def setCondition(self):
939
    self.cond = threading.Condition(self.sl)
940

    
941
  def testKeepMode(self):
942
    self.cond.acquire(shared=1)
943
    self.assert_(self.sl._is_owned(shared=1))
944
    self.cond.wait(0)
945
    self.assert_(self.sl._is_owned(shared=1))
946
    self.cond.release()
947
    self.cond.acquire(shared=0)
948
    self.assert_(self.sl._is_owned(shared=0))
949
    self.cond.wait(0)
950
    self.assert_(self.sl._is_owned(shared=0))
951
    self.cond.release()
952

    
953

    
954
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
955
  """SharedLock as a pipe condition lock tests"""
956

    
957
  def setCondition(self):
958
    self.cond = locking.PipeCondition(self.sl)
959

    
960

    
961
class TestSSynchronizedDecorator(_ThreadedTestCase):
962
  """Shared Lock Synchronized decorator test"""
963

    
964
  def setUp(self):
965
    _ThreadedTestCase.setUp(self)
966

    
967
  @locking.ssynchronized(_decoratorlock)
968
  def _doItExclusive(self):
969
    self.assert_(_decoratorlock._is_owned())
970
    self.done.put('EXC')
971

    
972
  @locking.ssynchronized(_decoratorlock, shared=1)
973
  def _doItSharer(self):
974
    self.assert_(_decoratorlock._is_owned(shared=1))
975
    self.done.put('SHR')
976

    
977
  def testDecoratedFunctions(self):
978
    self._doItExclusive()
979
    self.assertFalse(_decoratorlock._is_owned())
980
    self._doItSharer()
981
    self.assertFalse(_decoratorlock._is_owned())
982

    
983
  def testSharersCanCoexist(self):
984
    _decoratorlock.acquire(shared=1)
985
    threading.Thread(target=self._doItSharer).start()
986
    self.assert_(self.done.get(True, 1))
987
    _decoratorlock.release()
988

    
989
  @_Repeat
990
  def testExclusiveBlocksExclusive(self):
991
    _decoratorlock.acquire()
992
    self._addThread(target=self._doItExclusive)
993
    # give it a bit of time to check that it's not actually doing anything
994
    self.assertRaises(Queue.Empty, self.done.get_nowait)
995
    _decoratorlock.release()
996
    self._waitThreads()
997
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
998

    
999
  @_Repeat
1000
  def testExclusiveBlocksSharer(self):
1001
    _decoratorlock.acquire()
1002
    self._addThread(target=self._doItSharer)
1003
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1004
    _decoratorlock.release()
1005
    self._waitThreads()
1006
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1007

    
1008
  @_Repeat
1009
  def testSharerBlocksExclusive(self):
1010
    _decoratorlock.acquire(shared=1)
1011
    self._addThread(target=self._doItExclusive)
1012
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1013
    _decoratorlock.release()
1014
    self._waitThreads()
1015
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1016

    
1017

    
1018
class TestLockSet(_ThreadedTestCase):
1019
  """LockSet tests"""
1020

    
1021
  def setUp(self):
1022
    _ThreadedTestCase.setUp(self)
1023
    self._setUpLS()
1024

    
1025
  def _setUpLS(self):
1026
    """Helper to (re)initialize the lock set"""
1027
    self.resources = ['one', 'two', 'three']
1028
    self.ls = locking.LockSet(self.resources, "TestLockSet")
1029

    
1030
  def testResources(self):
1031
    self.assertEquals(self.ls._names(), set(self.resources))
1032
    newls = locking.LockSet([], "TestLockSet.testResources")
1033
    self.assertEquals(newls._names(), set())
1034

    
1035
  def testAcquireRelease(self):
1036
    self.assert_(self.ls.acquire('one'))
1037
    self.assertEquals(self.ls._list_owned(), set(['one']))
1038
    self.ls.release()
1039
    self.assertEquals(self.ls._list_owned(), set())
1040
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
1041
    self.assertEquals(self.ls._list_owned(), set(['one']))
1042
    self.ls.release()
1043
    self.assertEquals(self.ls._list_owned(), set())
1044
    self.ls.acquire(['one', 'two', 'three'])
1045
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1046
    self.ls.release('one')
1047
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
1048
    self.ls.release(['three'])
1049
    self.assertEquals(self.ls._list_owned(), set(['two']))
1050
    self.ls.release()
1051
    self.assertEquals(self.ls._list_owned(), set())
1052
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1053
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
1054
    self.ls.release()
1055
    self.assertEquals(self.ls._list_owned(), set())
1056

    
1057
  def testNoDoubleAcquire(self):
1058
    self.ls.acquire('one')
1059
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
1060
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1061
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1062
    self.ls.release()
1063
    self.ls.acquire(['one', 'three'])
1064
    self.ls.release('one')
1065
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1066
    self.ls.release('three')
1067

    
1068
  def testNoWrongRelease(self):
1069
    self.assertRaises(AssertionError, self.ls.release)
1070
    self.ls.acquire('one')
1071
    self.assertRaises(AssertionError, self.ls.release, 'two')
1072

    
1073
  def testAddRemove(self):
1074
    self.ls.add('four')
1075
    self.assertEquals(self.ls._list_owned(), set())
1076
    self.assert_('four' in self.ls._names())
1077
    self.ls.add(['five', 'six', 'seven'], acquired=1)
1078
    self.assert_('five' in self.ls._names())
1079
    self.assert_('six' in self.ls._names())
1080
    self.assert_('seven' in self.ls._names())
1081
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
1082
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1083
    self.assert_('five' not in self.ls._names())
1084
    self.assert_('six' not in self.ls._names())
1085
    self.assertEquals(self.ls._list_owned(), set(['seven']))
1086
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1087
    self.ls.remove('seven')
1088
    self.assert_('seven' not in self.ls._names())
1089
    self.assertEquals(self.ls._list_owned(), set([]))
1090
    self.ls.acquire(None, shared=1)
1091
    self.assertRaises(AssertionError, self.ls.add, 'eight')
1092
    self.ls.release()
1093
    self.ls.acquire(None)
1094
    self.ls.add('eight', acquired=1)
1095
    self.assert_('eight' in self.ls._names())
1096
    self.assert_('eight' in self.ls._list_owned())
1097
    self.ls.add('nine')
1098
    self.assert_('nine' in self.ls._names())
1099
    self.assert_('nine' not in self.ls._list_owned())
1100
    self.ls.release()
1101
    self.ls.remove(['two'])
1102
    self.assert_('two' not in self.ls._names())
1103
    self.ls.acquire('three')
1104
    self.assertEquals(self.ls.remove(['three']), ['three'])
1105
    self.assert_('three' not in self.ls._names())
1106
    self.assertEquals(self.ls.remove('three'), [])
1107
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1108
    self.assert_('one' not in self.ls._names())
1109

    
1110
  def testRemoveNonBlocking(self):
1111
    self.ls.acquire('one')
1112
    self.assertEquals(self.ls.remove('one'), ['one'])
1113
    self.ls.acquire(['two', 'three'])
1114
    self.assertEquals(self.ls.remove(['two', 'three']),
1115
                      ['two', 'three'])
1116

    
1117
  def testNoDoubleAdd(self):
1118
    self.assertRaises(errors.LockError, self.ls.add, 'two')
1119
    self.ls.add('four')
1120
    self.assertRaises(errors.LockError, self.ls.add, 'four')
1121

    
1122
  def testNoWrongRemoves(self):
1123
    self.ls.acquire(['one', 'three'], shared=1)
1124
    # Cannot remove 'two' while holding something which is not a superset
1125
    self.assertRaises(AssertionError, self.ls.remove, 'two')
1126
    # Cannot remove 'three' as we are sharing it
1127
    self.assertRaises(AssertionError, self.ls.remove, 'three')
1128

    
1129
  def testAcquireSetLock(self):
1130
    # acquire the set-lock exclusively
1131
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1132
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1133
    self.assertEquals(self.ls._is_owned(), True)
1134
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1135
    # I can still add/remove elements...
1136
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1137
    self.assert_(self.ls.add('six'))
1138
    self.ls.release()
1139
    # share the set-lock
1140
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1141
    # adding new elements is not possible
1142
    self.assertRaises(AssertionError, self.ls.add, 'five')
1143
    self.ls.release()
1144

    
1145
  def testAcquireWithRepetitions(self):
1146
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1147
                      set(['two', 'two', 'three']))
1148
    self.ls.release(['two', 'two'])
1149
    self.assertEquals(self.ls._list_owned(), set(['three']))
1150

    
1151
  def testEmptyAcquire(self):
1152
    # Acquire an empty list of locks...
1153
    self.assertEquals(self.ls.acquire([]), set())
1154
    self.assertEquals(self.ls._list_owned(), set())
1155
    # New locks can still be addded
1156
    self.assert_(self.ls.add('six'))
1157
    # "re-acquiring" is not an issue, since we had really acquired nothing
1158
    self.assertEquals(self.ls.acquire([], shared=1), set())
1159
    self.assertEquals(self.ls._list_owned(), set())
1160
    # We haven't really acquired anything, so we cannot release
1161
    self.assertRaises(AssertionError, self.ls.release)
1162

    
1163
  def _doLockSet(self, names, shared):
1164
    try:
1165
      self.ls.acquire(names, shared=shared)
1166
      self.done.put('DONE')
1167
      self.ls.release()
1168
    except errors.LockError:
1169
      self.done.put('ERR')
1170

    
1171
  def _doAddSet(self, names):
1172
    try:
1173
      self.ls.add(names, acquired=1)
1174
      self.done.put('DONE')
1175
      self.ls.release()
1176
    except errors.LockError:
1177
      self.done.put('ERR')
1178

    
1179
  def _doRemoveSet(self, names):
1180
    self.done.put(self.ls.remove(names))
1181

    
1182
  @_Repeat
1183
  def testConcurrentSharedAcquire(self):
1184
    self.ls.acquire(['one', 'two'], shared=1)
1185
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1186
    self._waitThreads()
1187
    self.assertEqual(self.done.get_nowait(), 'DONE')
1188
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1189
    self._waitThreads()
1190
    self.assertEqual(self.done.get_nowait(), 'DONE')
1191
    self._addThread(target=self._doLockSet, args=('three', 1))
1192
    self._waitThreads()
1193
    self.assertEqual(self.done.get_nowait(), 'DONE')
1194
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1195
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1196
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1197
    self.ls.release()
1198
    self._waitThreads()
1199
    self.assertEqual(self.done.get_nowait(), 'DONE')
1200
    self.assertEqual(self.done.get_nowait(), 'DONE')
1201

    
1202
  @_Repeat
1203
  def testConcurrentExclusiveAcquire(self):
1204
    self.ls.acquire(['one', 'two'])
1205
    self._addThread(target=self._doLockSet, args=('three', 1))
1206
    self._waitThreads()
1207
    self.assertEqual(self.done.get_nowait(), 'DONE')
1208
    self._addThread(target=self._doLockSet, args=('three', 0))
1209
    self._waitThreads()
1210
    self.assertEqual(self.done.get_nowait(), 'DONE')
1211
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1212
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1213
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1214
    self._addThread(target=self._doLockSet, args=('one', 0))
1215
    self._addThread(target=self._doLockSet, args=('one', 1))
1216
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1217
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1218
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1219
    self.ls.release()
1220
    self._waitThreads()
1221
    for _ in range(6):
1222
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1223

    
1224
  @_Repeat
1225
  def testSimpleAcquireTimeoutExpiring(self):
1226
    names = sorted(self.ls._names())
1227
    self.assert_(len(names) >= 3)
1228

    
1229
    # Get name of first lock
1230
    first = names[0]
1231

    
1232
    # Get name of last lock
1233
    last = names.pop()
1234

    
1235
    checks = [
1236
      # Block first and try to lock it again
1237
      (first, first),
1238

    
1239
      # Block last and try to lock all locks
1240
      (None, first),
1241

    
1242
      # Block last and try to lock it again
1243
      (last, last),
1244
      ]
1245

    
1246
    for (wanted, block) in checks:
1247
      # Lock in exclusive mode
1248
      self.assert_(self.ls.acquire(block, shared=0))
1249

    
1250
      def _AcquireOne():
1251
        # Try to get the same lock again with a timeout (should never succeed)
1252
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1253
        if acquired:
1254
          self.done.put("acquired")
1255
          self.ls.release()
1256
        else:
1257
          self.assert_(acquired is None)
1258
          self.assertFalse(self.ls._list_owned())
1259
          self.assertFalse(self.ls._is_owned())
1260
          self.done.put("not acquired")
1261

    
1262
      self._addThread(target=_AcquireOne)
1263

    
1264
      # Wait for timeout in thread to expire
1265
      self._waitThreads()
1266

    
1267
      # Release exclusive lock again
1268
      self.ls.release()
1269

    
1270
      self.assertEqual(self.done.get_nowait(), "not acquired")
1271
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1272

    
1273
  @_Repeat
1274
  def testDelayedAndExpiringLockAcquire(self):
1275
    self._setUpLS()
1276
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1277

    
1278
    for expire in (False, True):
1279
      names = sorted(self.ls._names())
1280
      self.assertEqual(len(names), 8)
1281

    
1282
      lock_ev = dict([(i, threading.Event()) for i in names])
1283

    
1284
      # Lock all in exclusive mode
1285
      self.assert_(self.ls.acquire(names, shared=0))
1286

    
1287
      if expire:
1288
        # We'll wait at least 300ms per lock
1289
        lockwait = len(names) * [0.3]
1290

    
1291
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1292
        # this gives us up to 2.4s to fail.
1293
        lockall_timeout = 0.4
1294
      else:
1295
        # This should finish rather quickly
1296
        lockwait = None
1297
        lockall_timeout = len(names) * 5.0
1298

    
1299
      def _LockAll():
1300
        def acquire_notification(name):
1301
          if not expire:
1302
            self.done.put("getting %s" % name)
1303

    
1304
          # Kick next lock
1305
          lock_ev[name].set()
1306

    
1307
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1308
                           test_notify=acquire_notification):
1309
          self.done.put("got all")
1310
          self.ls.release()
1311
        else:
1312
          self.done.put("timeout on all")
1313

    
1314
        # Notify all locks
1315
        for ev in lock_ev.values():
1316
          ev.set()
1317

    
1318
      t = self._addThread(target=_LockAll)
1319

    
1320
      for idx, name in enumerate(names):
1321
        # Wait for actual acquire on this lock to start
1322
        lock_ev[name].wait(10.0)
1323

    
1324
        if expire and t.isAlive():
1325
          # Wait some time after getting the notification to make sure the lock
1326
          # acquire will expire
1327
          SafeSleep(lockwait[idx])
1328

    
1329
        self.ls.release(names=name)
1330

    
1331
      self.assertFalse(self.ls._list_owned())
1332

    
1333
      self._waitThreads()
1334

    
1335
      if expire:
1336
        # Not checking which locks were actually acquired. Doing so would be
1337
        # too timing-dependant.
1338
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1339
      else:
1340
        for i in names:
1341
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1342
        self.assertEqual(self.done.get_nowait(), "got all")
1343
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1344

    
1345
  @_Repeat
1346
  def testConcurrentRemove(self):
1347
    self.ls.add('four')
1348
    self.ls.acquire(['one', 'two', 'four'])
1349
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1350
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1351
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1352
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1353
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1354
    self.ls.remove('one')
1355
    self.ls.release()
1356
    self._waitThreads()
1357
    for i in range(4):
1358
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1359
    self.ls.add(['five', 'six'], acquired=1)
1360
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1361
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1362
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1363
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1364
    self.ls.remove('five')
1365
    self.ls.release()
1366
    self._waitThreads()
1367
    for i in range(4):
1368
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1369
    self.ls.acquire(['three', 'four'])
1370
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1371
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1372
    self.ls.remove('four')
1373
    self._waitThreads()
1374
    self.assertEqual(self.done.get_nowait(), ['six'])
1375
    self._addThread(target=self._doRemoveSet, args=(['two']))
1376
    self._waitThreads()
1377
    self.assertEqual(self.done.get_nowait(), ['two'])
1378
    self.ls.release()
1379
    # reset lockset
1380
    self._setUpLS()
1381

    
1382
  @_Repeat
1383
  def testConcurrentSharedSetLock(self):
1384
    # share the set-lock...
1385
    self.ls.acquire(None, shared=1)
1386
    # ...another thread can share it too
1387
    self._addThread(target=self._doLockSet, args=(None, 1))
1388
    self._waitThreads()
1389
    self.assertEqual(self.done.get_nowait(), 'DONE')
1390
    # ...or just share some elements
1391
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1392
    self._waitThreads()
1393
    self.assertEqual(self.done.get_nowait(), 'DONE')
1394
    # ...but not add new ones or remove any
1395
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1396
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1397
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1398
    # this just releases the set-lock
1399
    self.ls.release([])
1400
    t.join(60)
1401
    self.assertEqual(self.done.get_nowait(), 'DONE')
1402
    # release the lock on the actual elements so remove() can proceed too
1403
    self.ls.release()
1404
    self._waitThreads()
1405
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1406
    # reset lockset
1407
    self._setUpLS()
1408

    
1409
  @_Repeat
1410
  def testConcurrentExclusiveSetLock(self):
1411
    # acquire the set-lock...
1412
    self.ls.acquire(None, shared=0)
1413
    # ...no one can do anything else
1414
    self._addThread(target=self._doLockSet, args=(None, 1))
1415
    self._addThread(target=self._doLockSet, args=(None, 0))
1416
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1417
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1418
    self._addThread(target=self._doAddSet, args=(['nine']))
1419
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1420
    self.ls.release()
1421
    self._waitThreads()
1422
    for _ in range(5):
1423
      self.assertEqual(self.done.get(True, 1), 'DONE')
1424
    # cleanup
1425
    self._setUpLS()
1426

    
1427
  @_Repeat
1428
  def testConcurrentSetLockAdd(self):
1429
    self.ls.acquire('one')
1430
    # Another thread wants the whole SetLock
1431
    self._addThread(target=self._doLockSet, args=(None, 0))
1432
    self._addThread(target=self._doLockSet, args=(None, 1))
1433
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1434
    self.assertRaises(AssertionError, self.ls.add, 'four')
1435
    self.ls.release()
1436
    self._waitThreads()
1437
    self.assertEqual(self.done.get_nowait(), 'DONE')
1438
    self.assertEqual(self.done.get_nowait(), 'DONE')
1439
    self.ls.acquire(None)
1440
    self._addThread(target=self._doLockSet, args=(None, 0))
1441
    self._addThread(target=self._doLockSet, args=(None, 1))
1442
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1443
    self.ls.add('four')
1444
    self.ls.add('five', acquired=1)
1445
    self.ls.add('six', acquired=1, shared=1)
1446
    self.assertEquals(self.ls._list_owned(),
1447
      set(['one', 'two', 'three', 'five', 'six']))
1448
    self.assertEquals(self.ls._is_owned(), True)
1449
    self.assertEquals(self.ls._names(),
1450
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1451
    self.ls.release()
1452
    self._waitThreads()
1453
    self.assertEqual(self.done.get_nowait(), 'DONE')
1454
    self.assertEqual(self.done.get_nowait(), 'DONE')
1455
    self._setUpLS()
1456

    
1457
  @_Repeat
1458
  def testEmptyLockSet(self):
1459
    # get the set-lock
1460
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1461
    # now empty it...
1462
    self.ls.remove(['one', 'two', 'three'])
1463
    # and adds/locks by another thread still wait
1464
    self._addThread(target=self._doAddSet, args=(['nine']))
1465
    self._addThread(target=self._doLockSet, args=(None, 1))
1466
    self._addThread(target=self._doLockSet, args=(None, 0))
1467
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1468
    self.ls.release()
1469
    self._waitThreads()
1470
    for _ in range(3):
1471
      self.assertEqual(self.done.get_nowait(), 'DONE')
1472
    # empty it again...
1473
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1474
    # now share it...
1475
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1476
    # other sharers can go, adds still wait
1477
    self._addThread(target=self._doLockSet, args=(None, 1))
1478
    self._waitThreads()
1479
    self.assertEqual(self.done.get_nowait(), 'DONE')
1480
    self._addThread(target=self._doAddSet, args=(['nine']))
1481
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1482
    self.ls.release()
1483
    self._waitThreads()
1484
    self.assertEqual(self.done.get_nowait(), 'DONE')
1485
    self._setUpLS()
1486

    
1487
  def testAcquireWithNamesDowngrade(self):
1488
    self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1489
    self.assertTrue(self.ls._is_owned())
1490
    self.assertFalse(self.ls._get_lock()._is_owned())
1491
    self.ls.release()
1492
    self.assertFalse(self.ls._is_owned())
1493
    self.assertFalse(self.ls._get_lock()._is_owned())
1494
    # Can't downgrade after releasing
1495
    self.assertRaises(AssertionError, self.ls.downgrade, "two")
1496

    
1497
  def testDowngrade(self):
1498
    # Not owning anything, must raise an exception
1499
    self.assertFalse(self.ls._is_owned())
1500
    self.assertRaises(AssertionError, self.ls.downgrade)
1501

    
1502
    self.assertFalse(compat.any(i._is_owned()
1503
                                for i in self.ls._get_lockdict().values()))
1504

    
1505
    self.assertEquals(self.ls.acquire(None, shared=0),
1506
                      set(["one", "two", "three"]))
1507
    self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1508

    
1509
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1510
    self.assertTrue(compat.all(i._is_owned(shared=0)
1511
                               for i in self.ls._get_lockdict().values()))
1512

    
1513
    # Start downgrading locks
1514
    self.assertTrue(self.ls.downgrade(names=["one"]))
1515
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1516
    self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")])
1517
                               for name, lock in
1518
                                 self.ls._get_lockdict().items()))
1519

    
1520
    self.assertTrue(self.ls.downgrade(names="two"))
1521
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1522
    should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1523
    self.assertTrue(compat.all(lock._is_owned(shared=should_share(name))
1524
                               for name, lock in
1525
                                 self.ls._get_lockdict().items()))
1526

    
1527
    # Downgrading the last exclusive lock to shared must downgrade the
1528
    # lockset-internal lock too
1529
    self.assertTrue(self.ls.downgrade(names="three"))
1530
    self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1531
    self.assertTrue(compat.all(i._is_owned(shared=1)
1532
                               for i in self.ls._get_lockdict().values()))
1533

    
1534
    # Downgrading a shared lock must be a no-op
1535
    self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1536
    self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1537
    self.assertTrue(compat.all(i._is_owned(shared=1)
1538
                               for i in self.ls._get_lockdict().values()))
1539

    
1540
    self.ls.release()
1541

    
1542
  def testPriority(self):
1543
    def _Acquire(prev, next, name, priority, success_fn):
1544
      prev.wait()
1545
      self.assert_(self.ls.acquire(name, shared=0,
1546
                                   priority=priority,
1547
                                   test_notify=lambda _: next.set()))
1548
      try:
1549
        success_fn()
1550
      finally:
1551
        self.ls.release()
1552

    
1553
    # Get all in exclusive mode
1554
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1555

    
1556
    done_two = Queue.Queue(0)
1557

    
1558
    first = threading.Event()
1559
    prev = first
1560

    
1561
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1562
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1563

    
1564
    # Use a deterministic random generator
1565
    random.Random(741).shuffle(acquires)
1566

    
1567
    for (name, prio, done) in acquires:
1568
      ev = threading.Event()
1569
      self._addThread(target=_Acquire,
1570
                      args=(prev, ev, name, prio,
1571
                            compat.partial(done.put, "Prio%s" % prio)))
1572
      prev = ev
1573

    
1574
    # Start acquires
1575
    first.set()
1576

    
1577
    # Wait for last acquire to start
1578
    prev.wait()
1579

    
1580
    # Let threads acquire locks
1581
    self.ls.release()
1582

    
1583
    # Wait for threads to finish
1584
    self._waitThreads()
1585

    
1586
    for i in range(1, 33):
1587
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1588
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1589

    
1590
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1591
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1592

    
1593

    
1594
class TestGanetiLockManager(_ThreadedTestCase):
1595

    
1596
  def setUp(self):
1597
    _ThreadedTestCase.setUp(self)
1598
    self.nodes=['n1', 'n2']
1599
    self.nodegroups=['g1', 'g2']
1600
    self.instances=['i1', 'i2', 'i3']
1601
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1602
                                        self.instances)
1603

    
1604
  def tearDown(self):
1605
    # Don't try this at home...
1606
    locking.GanetiLockManager._instance = None
1607

    
1608
  def testLockingConstants(self):
1609
    # The locking library internally cheats by assuming its constants have some
1610
    # relationships with each other. Check those hold true.
1611
    # This relationship is also used in the Processor to recursively acquire
1612
    # the right locks. Again, please don't break it.
1613
    for i in range(len(locking.LEVELS)):
1614
      self.assertEqual(i, locking.LEVELS[i])
1615

    
1616
  def testDoubleGLFails(self):
1617
    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1618

    
1619
  def testLockNames(self):
1620
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1621
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1622
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1623
                     set(self.nodegroups))
1624
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1625
                     set(self.instances))
1626

    
1627
  def testInitAndResources(self):
1628
    locking.GanetiLockManager._instance = None
1629
    self.GL = locking.GanetiLockManager([], [], [])
1630
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1631
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1632
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1633
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1634

    
1635
    locking.GanetiLockManager._instance = None
1636
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1637
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1638
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1639
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1640
                                    set(self.nodegroups))
1641
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1642

    
1643
    locking.GanetiLockManager._instance = None
1644
    self.GL = locking.GanetiLockManager([], [], self.instances)
1645
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1646
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1647
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1648
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1649
                     set(self.instances))
1650

    
1651
  def testAcquireRelease(self):
1652
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1653
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1654
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1655
    self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1656
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1657
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1658
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1659
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1660
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1661
    self.GL.release(locking.LEVEL_NODE)
1662
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1663
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1664
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1665
    self.GL.release(locking.LEVEL_NODEGROUP)
1666
    self.GL.release(locking.LEVEL_INSTANCE)
1667
    self.assertRaises(errors.LockError, self.GL.acquire,
1668
                      locking.LEVEL_INSTANCE, ['i5'])
1669
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1670
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1671

    
1672
  def testAcquireWholeSets(self):
1673
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1674
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1675
                      set(self.instances))
1676
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1677
                      set(self.instances))
1678
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1679
                      set(self.nodegroups))
1680
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1681
                      set(self.nodegroups))
1682
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1683
                      set(self.nodes))
1684
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1685
                      set(self.nodes))
1686
    self.GL.release(locking.LEVEL_NODE)
1687
    self.GL.release(locking.LEVEL_NODEGROUP)
1688
    self.GL.release(locking.LEVEL_INSTANCE)
1689
    self.GL.release(locking.LEVEL_CLUSTER)
1690

    
1691
  def testAcquireWholeAndPartial(self):
1692
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1693
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1694
                      set(self.instances))
1695
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1696
                      set(self.instances))
1697
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1698
                      set(['n2']))
1699
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1700
                      set(['n2']))
1701
    self.GL.release(locking.LEVEL_NODE)
1702
    self.GL.release(locking.LEVEL_INSTANCE)
1703
    self.GL.release(locking.LEVEL_CLUSTER)
1704

    
1705
  def testBGLDependency(self):
1706
    self.assertRaises(AssertionError, self.GL.acquire,
1707
                      locking.LEVEL_NODE, ['n1', 'n2'])
1708
    self.assertRaises(AssertionError, self.GL.acquire,
1709
                      locking.LEVEL_INSTANCE, ['i3'])
1710
    self.assertRaises(AssertionError, self.GL.acquire,
1711
                      locking.LEVEL_NODEGROUP, ['g1'])
1712
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1713
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1714
    self.assertRaises(AssertionError, self.GL.release,
1715
                      locking.LEVEL_CLUSTER, ['BGL'])
1716
    self.assertRaises(AssertionError, self.GL.release,
1717
                      locking.LEVEL_CLUSTER)
1718
    self.GL.release(locking.LEVEL_NODE)
1719
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1720
    self.assertRaises(AssertionError, self.GL.release,
1721
                      locking.LEVEL_CLUSTER, ['BGL'])
1722
    self.assertRaises(AssertionError, self.GL.release,
1723
                      locking.LEVEL_CLUSTER)
1724
    self.GL.release(locking.LEVEL_INSTANCE)
1725
    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1726
    self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1727
    self.assertRaises(AssertionError, self.GL.release,
1728
                      locking.LEVEL_CLUSTER, ['BGL'])
1729
    self.assertRaises(AssertionError, self.GL.release,
1730
                      locking.LEVEL_CLUSTER)
1731
    self.GL.release(locking.LEVEL_NODEGROUP)
1732
    self.GL.release(locking.LEVEL_CLUSTER)
1733

    
1734
  def testWrongOrder(self):
1735
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1736
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1737
    self.assertRaises(AssertionError, self.GL.acquire,
1738
                      locking.LEVEL_NODE, ['n1'])
1739
    self.assertRaises(AssertionError, self.GL.acquire,
1740
                      locking.LEVEL_NODEGROUP, ['g1'])
1741
    self.assertRaises(AssertionError, self.GL.acquire,
1742
                      locking.LEVEL_INSTANCE, ['i2'])
1743

    
1744
  def testModifiableLevels(self):
1745
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1746
                      ['BGL2'])
1747
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1748
    self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1749
    self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1750
    self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1751
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1752
    self.GL.add(locking.LEVEL_NODE, ['n3'])
1753
    self.GL.remove(locking.LEVEL_NODE, ['n1'])
1754
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1755
    self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1756
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1757
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1758
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1759
    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1760
                      ['BGL2'])
1761

    
1762
  # Helper function to run as a thread that shared the BGL and then acquires
1763
  # some locks at another level.
1764
  def _doLock(self, level, names, shared):
1765
    try:
1766
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1767
      self.GL.acquire(level, names, shared=shared)
1768
      self.done.put('DONE')
1769
      self.GL.release(level)
1770
      self.GL.release(locking.LEVEL_CLUSTER)
1771
    except errors.LockError:
1772
      self.done.put('ERR')
1773

    
1774
  @_Repeat
1775
  def testConcurrency(self):
1776
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1777
    self._addThread(target=self._doLock,
1778
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1779
    self._waitThreads()
1780
    self.assertEqual(self.done.get_nowait(), 'DONE')
1781
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1782
    self._addThread(target=self._doLock,
1783
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1784
    self._waitThreads()
1785
    self.assertEqual(self.done.get_nowait(), 'DONE')
1786
    self._addThread(target=self._doLock,
1787
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1788
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1789
    self.GL.release(locking.LEVEL_INSTANCE)
1790
    self._waitThreads()
1791
    self.assertEqual(self.done.get_nowait(), 'DONE')
1792
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1793
    self._addThread(target=self._doLock,
1794
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1795
    self._waitThreads()
1796
    self.assertEqual(self.done.get_nowait(), 'DONE')
1797
    self._addThread(target=self._doLock,
1798
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1799
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1800
    self.GL.release(locking.LEVEL_INSTANCE)
1801
    self._waitThreads()
1802
    self.assertEqual(self.done.get(True, 1), 'DONE')
1803
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1804

    
1805

    
1806
class TestLockMonitor(_ThreadedTestCase):
1807
  def setUp(self):
1808
    _ThreadedTestCase.setUp(self)
1809
    self.lm = locking.LockMonitor()
1810

    
1811
  def testSingleThread(self):
1812
    locks = []
1813

    
1814
    for i in range(100):
1815
      name = "TestLock%s" % i
1816
      locks.append(locking.SharedLock(name, monitor=self.lm))
1817

    
1818
    self.assertEqual(len(self.lm._locks), len(locks))
1819
    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1820
    self.assertEqual(len(result.fields), 1)
1821
    self.assertEqual(len(result.data), 100)
1822

    
1823
    # Delete all locks
1824
    del locks[:]
1825

    
1826
    # The garbage collector might needs some time
1827
    def _CheckLocks():
1828
      if self.lm._locks:
1829
        raise utils.RetryAgain()
1830

    
1831
    utils.Retry(_CheckLocks, 0.1, 30.0)
1832

    
1833
    self.assertFalse(self.lm._locks)
1834

    
1835
  def testMultiThread(self):
1836
    locks = []
1837

    
1838
    def _CreateLock(prev, next, name):
1839
      prev.wait()
1840
      locks.append(locking.SharedLock(name, monitor=self.lm))
1841
      if next:
1842
        next.set()
1843

    
1844
    expnames = []
1845

    
1846
    first = threading.Event()
1847
    prev = first
1848

    
1849
    # Use a deterministic random generator
1850
    for i in random.Random(4263).sample(range(100), 33):
1851
      name = "MtTestLock%s" % i
1852
      expnames.append(name)
1853

    
1854
      ev = threading.Event()
1855
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1856
      prev = ev
1857

    
1858
    # Add locks
1859
    first.set()
1860
    self._waitThreads()
1861

    
1862
    # Check order in which locks were added
1863
    self.assertEqual([i.name for i in locks], expnames)
1864

    
1865
    # Check query result
1866
    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1867
    self.assert_(isinstance(result, dict))
1868
    response = objects.QueryResponse.FromDict(result)
1869
    self.assertEqual(response.data,
1870
                     [[(constants.RS_NORMAL, name),
1871
                       (constants.RS_NORMAL, None),
1872
                       (constants.RS_NORMAL, None),
1873
                       (constants.RS_NORMAL, [])]
1874
                      for name in utils.NiceSort(expnames)])
1875
    self.assertEqual(len(response.fields), 4)
1876
    self.assertEqual(["name", "mode", "owner", "pending"],
1877
                     [fdef.name for fdef in response.fields])
1878

    
1879
    # Test exclusive acquire
1880
    for tlock in locks[::4]:
1881
      tlock.acquire(shared=0)
1882
      try:
1883
        def _GetExpResult(name):
1884
          if tlock.name == name:
1885
            return [(constants.RS_NORMAL, name),
1886
                    (constants.RS_NORMAL, "exclusive"),
1887
                    (constants.RS_NORMAL,
1888
                     [threading.currentThread().getName()]),
1889
                    (constants.RS_NORMAL, [])]
1890
          return [(constants.RS_NORMAL, name),
1891
                  (constants.RS_NORMAL, None),
1892
                  (constants.RS_NORMAL, None),
1893
                  (constants.RS_NORMAL, [])]
1894

    
1895
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1896
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1897
                         [_GetExpResult(name)
1898
                          for name in utils.NiceSort(expnames)])
1899
      finally:
1900
        tlock.release()
1901

    
1902
    # Test shared acquire
1903
    def _Acquire(lock, shared, ev, notify):
1904
      lock.acquire(shared=shared)
1905
      try:
1906
        notify.set()
1907
        ev.wait()
1908
      finally:
1909
        lock.release()
1910

    
1911
    for tlock1 in locks[::11]:
1912
      for tlock2 in locks[::-15]:
1913
        if tlock2 == tlock1:
1914
          # Avoid deadlocks
1915
          continue
1916

    
1917
        for tlock3 in locks[::10]:
1918
          if tlock3 in (tlock2, tlock1):
1919
            # Avoid deadlocks
1920
            continue
1921

    
1922
          releaseev = threading.Event()
1923

    
1924
          # Acquire locks
1925
          acquireev = []
1926
          tthreads1 = []
1927
          for i in range(3):
1928
            ev = threading.Event()
1929
            tthreads1.append(self._addThread(target=_Acquire,
1930
                                             args=(tlock1, 1, releaseev, ev)))
1931
            acquireev.append(ev)
1932

    
1933
          ev = threading.Event()
1934
          tthread2 = self._addThread(target=_Acquire,
1935
                                     args=(tlock2, 1, releaseev, ev))
1936
          acquireev.append(ev)
1937

    
1938
          ev = threading.Event()
1939
          tthread3 = self._addThread(target=_Acquire,
1940
                                     args=(tlock3, 0, releaseev, ev))
1941
          acquireev.append(ev)
1942

    
1943
          # Wait for all locks to be acquired
1944
          for i in acquireev:
1945
            i.wait()
1946

    
1947
          # Check query result
1948
          result = self.lm.QueryLocks(["name", "mode", "owner"])
1949
          response = objects.QueryResponse.FromDict(result)
1950
          for (name, mode, owner) in response.data:
1951
            (name_status, name_value) = name
1952
            (owner_status, owner_value) = owner
1953

    
1954
            self.assertEqual(name_status, constants.RS_NORMAL)
1955
            self.assertEqual(owner_status, constants.RS_NORMAL)
1956

    
1957
            if name_value == tlock1.name:
1958
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1959
              self.assertEqual(set(owner_value),
1960
                               set(i.getName() for i in tthreads1))
1961
              continue
1962

    
1963
            if name_value == tlock2.name:
1964
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1965
              self.assertEqual(owner_value, [tthread2.getName()])
1966
              continue
1967

    
1968
            if name_value == tlock3.name:
1969
              self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1970
              self.assertEqual(owner_value, [tthread3.getName()])
1971
              continue
1972

    
1973
            self.assert_(name_value in expnames)
1974
            self.assertEqual(mode, (constants.RS_NORMAL, None))
1975
            self.assert_(owner_value is None)
1976

    
1977
          # Release locks again
1978
          releaseev.set()
1979

    
1980
          self._waitThreads()
1981

    
1982
          result = self.lm.QueryLocks(["name", "mode", "owner"])
1983
          self.assertEqual(objects.QueryResponse.FromDict(result).data,
1984
                           [[(constants.RS_NORMAL, name),
1985
                             (constants.RS_NORMAL, None),
1986
                             (constants.RS_NORMAL, None)]
1987
                            for name in utils.NiceSort(expnames)])
1988

    
1989
  def testDelete(self):
1990
    lock = locking.SharedLock("TestLock", monitor=self.lm)
1991

    
1992
    self.assertEqual(len(self.lm._locks), 1)
1993
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1994
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
1995
                     [[(constants.RS_NORMAL, lock.name),
1996
                       (constants.RS_NORMAL, None),
1997
                       (constants.RS_NORMAL, None)]])
1998

    
1999
    lock.delete()
2000

    
2001
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2002
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2003
                     [[(constants.RS_NORMAL, lock.name),
2004
                       (constants.RS_NORMAL, "deleted"),
2005
                       (constants.RS_NORMAL, None)]])
2006
    self.assertEqual(len(self.lm._locks), 1)
2007

    
2008
  def testPending(self):
2009
    def _Acquire(lock, shared, prev, next):
2010
      prev.wait()
2011

    
2012
      lock.acquire(shared=shared, test_notify=next.set)
2013
      try:
2014
        pass
2015
      finally:
2016
        lock.release()
2017

    
2018
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
2019

    
2020
    for shared in [0, 1]:
2021
      lock.acquire()
2022
      try:
2023
        self.assertEqual(len(self.lm._locks), 1)
2024
        result = self.lm.QueryLocks(["name", "mode", "owner"])
2025
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2026
                         [[(constants.RS_NORMAL, lock.name),
2027
                           (constants.RS_NORMAL, "exclusive"),
2028
                           (constants.RS_NORMAL,
2029
                            [threading.currentThread().getName()])]])
2030

    
2031
        threads = []
2032

    
2033
        first = threading.Event()
2034
        prev = first
2035

    
2036
        for i in range(5):
2037
          ev = threading.Event()
2038
          threads.append(self._addThread(target=_Acquire,
2039
                                          args=(lock, shared, prev, ev)))
2040
          prev = ev
2041

    
2042
        # Start acquires
2043
        first.set()
2044

    
2045
        # Wait for last acquire to start waiting
2046
        prev.wait()
2047

    
2048
        # NOTE: This works only because QueryLocks will acquire the
2049
        # lock-internal lock again and won't be able to get the information
2050
        # until it has the lock. By then the acquire should be registered in
2051
        # SharedLock.__pending (otherwise it's a bug).
2052

    
2053
        # All acquires are waiting now
2054
        if shared:
2055
          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2056
        else:
2057
          pending = [("exclusive", [t.getName()]) for t in threads]
2058

    
2059
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2060
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2061
                         [[(constants.RS_NORMAL, lock.name),
2062
                           (constants.RS_NORMAL, "exclusive"),
2063
                           (constants.RS_NORMAL,
2064
                            [threading.currentThread().getName()]),
2065
                           (constants.RS_NORMAL, pending)]])
2066

    
2067
        self.assertEqual(len(self.lm._locks), 1)
2068
      finally:
2069
        lock.release()
2070

    
2071
      self._waitThreads()
2072

    
2073
      # No pending acquires
2074
      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2075
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2076
                       [[(constants.RS_NORMAL, lock.name),
2077
                         (constants.RS_NORMAL, None),
2078
                         (constants.RS_NORMAL, None),
2079
                         (constants.RS_NORMAL, [])]])
2080

    
2081
      self.assertEqual(len(self.lm._locks), 1)
2082

    
2083
  def testDeleteAndRecreate(self):
2084
    lname = "TestLock101923193"
2085

    
2086
    # Create some locks with the same name and keep all references
2087
    locks = [locking.SharedLock(lname, monitor=self.lm)
2088
             for _ in range(5)]
2089

    
2090
    self.assertEqual(len(self.lm._locks), len(locks))
2091

    
2092
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2093
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2094
                     [[(constants.RS_NORMAL, lname),
2095
                       (constants.RS_NORMAL, None),
2096
                       (constants.RS_NORMAL, None)]] * 5)
2097

    
2098
    locks[2].delete()
2099

    
2100
    # Check information order
2101
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2102
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2103
                     [[(constants.RS_NORMAL, lname),
2104
                       (constants.RS_NORMAL, None),
2105
                       (constants.RS_NORMAL, None)]] * 2 +
2106
                     [[(constants.RS_NORMAL, lname),
2107
                       (constants.RS_NORMAL, "deleted"),
2108
                       (constants.RS_NORMAL, None)]] +
2109
                     [[(constants.RS_NORMAL, lname),
2110
                       (constants.RS_NORMAL, None),
2111
                       (constants.RS_NORMAL, None)]] * 2)
2112

    
2113
    locks[1].acquire(shared=0)
2114

    
2115
    last_status = [
2116
      [(constants.RS_NORMAL, lname),
2117
       (constants.RS_NORMAL, None),
2118
       (constants.RS_NORMAL, None)],
2119
      [(constants.RS_NORMAL, lname),
2120
       (constants.RS_NORMAL, "exclusive"),
2121
       (constants.RS_NORMAL, [threading.currentThread().getName()])],
2122
      [(constants.RS_NORMAL, lname),
2123
       (constants.RS_NORMAL, "deleted"),
2124
       (constants.RS_NORMAL, None)],
2125
      [(constants.RS_NORMAL, lname),
2126
       (constants.RS_NORMAL, None),
2127
       (constants.RS_NORMAL, None)],
2128
      [(constants.RS_NORMAL, lname),
2129
       (constants.RS_NORMAL, None),
2130
       (constants.RS_NORMAL, None)],
2131
      ]
2132

    
2133
    # Check information order
2134
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2135
    self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2136

    
2137
    self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2138
    self.assertEqual(len(self.lm._locks), len(locks))
2139

    
2140
    # Check lock deletion
2141
    for idx in range(len(locks)):
2142
      del locks[0]
2143
      assert gc.isenabled()
2144
      gc.collect()
2145
      self.assertEqual(len(self.lm._locks), len(locks))
2146
      result = self.lm.QueryLocks(["name", "mode", "owner"])
2147
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2148
                       last_status[idx + 1:])
2149

    
2150
    # All locks should have been deleted
2151
    assert not locks
2152
    self.assertFalse(self.lm._locks)
2153

    
2154
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2155
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2156

    
2157

    
2158
if __name__ == '__main__':
2159
  testutils.GanetiTestProgram()