Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 26082b7e

History | View | Annotate | Download (76.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import gc
32
import itertools
33

    
34
from ganeti import constants
35
from ganeti import locking
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import compat
39
from ganeti import objects
40
from ganeti import query
41

    
42
import testutils
43

    
44

    
45
# This is used to test the ssynchronize decorator.
46
# Since it's passed as input to a decorator it must be declared as a global.
47
_decoratorlock = locking.SharedLock("decorator lock")
48

    
49
#: List for looping tests
50
ITERATIONS = range(8)
51

    
52

    
53
def _Repeat(fn):
54
  """Decorator for executing a function many times"""
55
  def wrapper(*args, **kwargs):
56
    for i in ITERATIONS:
57
      fn(*args, **kwargs)
58
  return wrapper
59

    
60

    
61
def SafeSleep(duration):
62
  start = time.time()
63
  while True:
64
    delay = start + duration - time.time()
65
    if delay <= 0.0:
66
      break
67
    time.sleep(delay)
68

    
69

    
70
class _ThreadedTestCase(unittest.TestCase):
71
  """Test class that supports adding/waiting on threads"""
72
  def setUp(self):
73
    unittest.TestCase.setUp(self)
74
    self.done = Queue.Queue(0)
75
    self.threads = []
76

    
77
  def _addThread(self, *args, **kwargs):
78
    """Create and remember a new thread"""
79
    t = threading.Thread(*args, **kwargs)
80
    self.threads.append(t)
81
    t.start()
82
    return t
83

    
84
  def _waitThreads(self):
85
    """Wait for all our threads to finish"""
86
    for t in self.threads:
87
      t.join(60)
88
      self.failIf(t.isAlive())
89
    self.threads = []
90

    
91

    
92
class _ConditionTestCase(_ThreadedTestCase):
93
  """Common test case for conditions"""
94

    
95
  def setUp(self, cls):
96
    _ThreadedTestCase.setUp(self)
97
    self.lock = threading.Lock()
98
    self.cond = cls(self.lock)
99

    
100
  def _testAcquireRelease(self):
101
    self.assertFalse(self.cond._is_owned())
102
    self.assertRaises(RuntimeError, self.cond.wait, None)
103
    self.assertRaises(RuntimeError, self.cond.notifyAll)
104

    
105
    self.cond.acquire()
106
    self.assert_(self.cond._is_owned())
107
    self.cond.notifyAll()
108
    self.assert_(self.cond._is_owned())
109
    self.cond.release()
110

    
111
    self.assertFalse(self.cond._is_owned())
112
    self.assertRaises(RuntimeError, self.cond.wait, None)
113
    self.assertRaises(RuntimeError, self.cond.notifyAll)
114

    
115
  def _testNotification(self):
116
    def _NotifyAll():
117
      self.done.put("NE")
118
      self.cond.acquire()
119
      self.done.put("NA")
120
      self.cond.notifyAll()
121
      self.done.put("NN")
122
      self.cond.release()
123

    
124
    self.cond.acquire()
125
    self._addThread(target=_NotifyAll)
126
    self.assertEqual(self.done.get(True, 1), "NE")
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.wait(None)
129
    self.assertEqual(self.done.get(True, 1), "NA")
130
    self.assertEqual(self.done.get(True, 1), "NN")
131
    self.assert_(self.cond._is_owned())
132
    self.cond.release()
133
    self.assertFalse(self.cond._is_owned())
134

    
135

    
136
class TestSingleNotifyPipeCondition(_ConditionTestCase):
137
  """SingleNotifyPipeCondition tests"""
138

    
139
  def setUp(self):
140
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141

    
142
  def testAcquireRelease(self):
143
    self._testAcquireRelease()
144

    
145
  def testNotification(self):
146
    self._testNotification()
147

    
148
  def testWaitReuse(self):
149
    self.cond.acquire()
150
    self.cond.wait(0)
151
    self.cond.wait(0.1)
152
    self.cond.release()
153

    
154
  def testNoNotifyReuse(self):
155
    self.cond.acquire()
156
    self.cond.notifyAll()
157
    self.assertRaises(RuntimeError, self.cond.wait, None)
158
    self.assertRaises(RuntimeError, self.cond.notifyAll)
159
    self.cond.release()
160

    
161

    
162
class TestPipeCondition(_ConditionTestCase):
163
  """PipeCondition tests"""
164

    
165
  def setUp(self):
166
    _ConditionTestCase.setUp(self, locking.PipeCondition)
167

    
168
  def testAcquireRelease(self):
169
    self._testAcquireRelease()
170

    
171
  def testNotification(self):
172
    self._testNotification()
173

    
174
  def _TestWait(self, fn):
175
    threads = [
176
      self._addThread(target=fn),
177
      self._addThread(target=fn),
178
      self._addThread(target=fn),
179
      ]
180

    
181
    # Wait for threads to be waiting
182
    for _ in threads:
183
      self.assertEqual(self.done.get(True, 1), "A")
184

    
185
    self.assertRaises(Queue.Empty, self.done.get_nowait)
186

    
187
    self.cond.acquire()
188
    self.assertEqual(len(self.cond._waiters), 3)
189
    self.assertEqual(self.cond._waiters, set(threads))
190
    # This new thread can't acquire the lock, and thus call wait, before we
191
    # release it
192
    self._addThread(target=fn)
193
    self.cond.notifyAll()
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195
    self.cond.release()
196

    
197
    # We should now get 3 W and 1 A (for the new thread) in whatever order
198
    w = 0
199
    a = 0
200
    for i in range(4):
201
      got = self.done.get(True, 1)
202
      if got == "W":
203
        w += 1
204
      elif got == "A":
205
        a += 1
206
      else:
207
        self.fail("Got %s on the done queue" % got)
208

    
209
    self.assertEqual(w, 3)
210
    self.assertEqual(a, 1)
211

    
212
    self.cond.acquire()
213
    self.cond.notifyAll()
214
    self.cond.release()
215
    self._waitThreads()
216
    self.assertEqual(self.done.get_nowait(), "W")
217
    self.assertRaises(Queue.Empty, self.done.get_nowait)
218

    
219
  def testBlockingWait(self):
220
    def _BlockingWait():
221
      self.cond.acquire()
222
      self.done.put("A")
223
      self.cond.wait(None)
224
      self.cond.release()
225
      self.done.put("W")
226

    
227
    self._TestWait(_BlockingWait)
228

    
229
  def testLongTimeoutWait(self):
230
    def _Helper():
231
      self.cond.acquire()
232
      self.done.put("A")
233
      self.cond.wait(15.0)
234
      self.cond.release()
235
      self.done.put("W")
236

    
237
    self._TestWait(_Helper)
238

    
239
  def _TimeoutWait(self, timeout, check):
240
    self.cond.acquire()
241
    self.cond.wait(timeout)
242
    self.cond.release()
243
    self.done.put(check)
244

    
245
  def testShortTimeoutWait(self):
246
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248
    self._waitThreads()
249
    self.assertEqual(self.done.get_nowait(), "T1")
250
    self.assertEqual(self.done.get_nowait(), "T1")
251
    self.assertRaises(Queue.Empty, self.done.get_nowait)
252

    
253
  def testZeroTimeoutWait(self):
254
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257
    self._waitThreads()
258
    self.assertEqual(self.done.get_nowait(), "T0")
259
    self.assertEqual(self.done.get_nowait(), "T0")
260
    self.assertEqual(self.done.get_nowait(), "T0")
261
    self.assertRaises(Queue.Empty, self.done.get_nowait)
262

    
263

    
264
class TestSharedLock(_ThreadedTestCase):
265
  """SharedLock tests"""
266

    
267
  def setUp(self):
268
    _ThreadedTestCase.setUp(self)
269
    self.sl = locking.SharedLock("TestSharedLock")
270

    
271
  def testSequenceAndOwnership(self):
272
    self.assertFalse(self.sl._is_owned())
273
    self.sl.acquire(shared=1)
274
    self.assert_(self.sl._is_owned())
275
    self.assert_(self.sl._is_owned(shared=1))
276
    self.assertFalse(self.sl._is_owned(shared=0))
277
    self.sl.release()
278
    self.assertFalse(self.sl._is_owned())
279
    self.sl.acquire()
280
    self.assert_(self.sl._is_owned())
281
    self.assertFalse(self.sl._is_owned(shared=1))
282
    self.assert_(self.sl._is_owned(shared=0))
283
    self.sl.release()
284
    self.assertFalse(self.sl._is_owned())
285
    self.sl.acquire(shared=1)
286
    self.assert_(self.sl._is_owned())
287
    self.assert_(self.sl._is_owned(shared=1))
288
    self.assertFalse(self.sl._is_owned(shared=0))
289
    self.sl.release()
290
    self.assertFalse(self.sl._is_owned())
291

    
292
  def testBooleanValue(self):
293
    # semaphores are supposed to return a true value on a successful acquire
294
    self.assert_(self.sl.acquire(shared=1))
295
    self.sl.release()
296
    self.assert_(self.sl.acquire())
297
    self.sl.release()
298

    
299
  def testDoubleLockingStoE(self):
300
    self.sl.acquire(shared=1)
301
    self.assertRaises(AssertionError, self.sl.acquire)
302

    
303
  def testDoubleLockingEtoS(self):
304
    self.sl.acquire()
305
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306

    
307
  def testDoubleLockingStoS(self):
308
    self.sl.acquire(shared=1)
309
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310

    
311
  def testDoubleLockingEtoE(self):
312
    self.sl.acquire()
313
    self.assertRaises(AssertionError, self.sl.acquire)
314

    
315
  # helper functions: called in a separate thread they acquire the lock, send
316
  # their identifier on the done queue, then release it.
317
  def _doItSharer(self):
318
    try:
319
      self.sl.acquire(shared=1)
320
      self.done.put('SHR')
321
      self.sl.release()
322
    except errors.LockError:
323
      self.done.put('ERR')
324

    
325
  def _doItExclusive(self):
326
    try:
327
      self.sl.acquire()
328
      self.done.put('EXC')
329
      self.sl.release()
330
    except errors.LockError:
331
      self.done.put('ERR')
332

    
333
  def _doItDelete(self):
334
    try:
335
      self.sl.delete()
336
      self.done.put('DEL')
337
    except errors.LockError:
338
      self.done.put('ERR')
339

    
340
  def testSharersCanCoexist(self):
341
    self.sl.acquire(shared=1)
342
    threading.Thread(target=self._doItSharer).start()
343
    self.assert_(self.done.get(True, 1))
344
    self.sl.release()
345

    
346
  @_Repeat
347
  def testExclusiveBlocksExclusive(self):
348
    self.sl.acquire()
349
    self._addThread(target=self._doItExclusive)
350
    self.assertRaises(Queue.Empty, self.done.get_nowait)
351
    self.sl.release()
352
    self._waitThreads()
353
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354

    
355
  @_Repeat
356
  def testExclusiveBlocksDelete(self):
357
    self.sl.acquire()
358
    self._addThread(target=self._doItDelete)
359
    self.assertRaises(Queue.Empty, self.done.get_nowait)
360
    self.sl.release()
361
    self._waitThreads()
362
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363
    self.sl = locking.SharedLock(self.sl.name)
364

    
365
  @_Repeat
366
  def testExclusiveBlocksSharer(self):
367
    self.sl.acquire()
368
    self._addThread(target=self._doItSharer)
369
    self.assertRaises(Queue.Empty, self.done.get_nowait)
370
    self.sl.release()
371
    self._waitThreads()
372
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373

    
374
  @_Repeat
375
  def testSharerBlocksExclusive(self):
376
    self.sl.acquire(shared=1)
377
    self._addThread(target=self._doItExclusive)
378
    self.assertRaises(Queue.Empty, self.done.get_nowait)
379
    self.sl.release()
380
    self._waitThreads()
381
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382

    
383
  @_Repeat
384
  def testSharerBlocksDelete(self):
385
    self.sl.acquire(shared=1)
386
    self._addThread(target=self._doItDelete)
387
    self.assertRaises(Queue.Empty, self.done.get_nowait)
388
    self.sl.release()
389
    self._waitThreads()
390
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391
    self.sl = locking.SharedLock(self.sl.name)
392

    
393
  @_Repeat
394
  def testWaitingExclusiveBlocksSharer(self):
395
    """SKIPPED testWaitingExclusiveBlockSharer"""
396
    return
397

    
398
    self.sl.acquire(shared=1)
399
    # the lock is acquired in shared mode...
400
    self._addThread(target=self._doItExclusive)
401
    # ...but now an exclusive is waiting...
402
    self._addThread(target=self._doItSharer)
403
    # ...so the sharer should be blocked as well
404
    self.assertRaises(Queue.Empty, self.done.get_nowait)
405
    self.sl.release()
406
    self._waitThreads()
407
    # The exclusive passed before
408
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410

    
411
  @_Repeat
412
  def testWaitingSharerBlocksExclusive(self):
413
    """SKIPPED testWaitingSharerBlocksExclusive"""
414
    return
415

    
416
    self.sl.acquire()
417
    # the lock is acquired in exclusive mode...
418
    self._addThread(target=self._doItSharer)
419
    # ...but now a sharer is waiting...
420
    self._addThread(target=self._doItExclusive)
421
    # ...the exclusive is waiting too...
422
    self.assertRaises(Queue.Empty, self.done.get_nowait)
423
    self.sl.release()
424
    self._waitThreads()
425
    # The sharer passed before
426
    self.assertEqual(self.done.get_nowait(), 'SHR')
427
    self.assertEqual(self.done.get_nowait(), 'EXC')
428

    
429
  def testDelete(self):
430
    self.sl.delete()
431
    self.assertRaises(errors.LockError, self.sl.acquire)
432
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433
    self.assertRaises(errors.LockError, self.sl.delete)
434

    
435
  def testDeleteTimeout(self):
436
    self.assertTrue(self.sl.delete(timeout=60))
437

    
438
  def testDeleteTimeoutFail(self):
439
    ready = threading.Event()
440
    finish = threading.Event()
441

    
442
    def fn():
443
      self.sl.acquire(shared=0)
444
      ready.set()
445

    
446
      finish.wait()
447
      self.sl.release()
448

    
449
    self._addThread(target=fn)
450
    ready.wait()
451

    
452
    # Test if deleting a lock owned in exclusive mode by another thread fails
453
    # to delete when a timeout is used
454
    self.assertFalse(self.sl.delete(timeout=0.02))
455

    
456
    finish.set()
457
    self._waitThreads()
458

    
459
    self.assertTrue(self.sl.delete())
460
    self.assertRaises(errors.LockError, self.sl.acquire)
461

    
462
  def testNoDeleteIfSharer(self):
463
    self.sl.acquire(shared=1)
464
    self.assertRaises(AssertionError, self.sl.delete)
465

    
466
  @_Repeat
467
  def testDeletePendingSharersExclusiveDelete(self):
468
    self.sl.acquire()
469
    self._addThread(target=self._doItSharer)
470
    self._addThread(target=self._doItSharer)
471
    self._addThread(target=self._doItExclusive)
472
    self._addThread(target=self._doItDelete)
473
    self.sl.delete()
474
    self._waitThreads()
475
    # The threads who were pending return ERR
476
    for _ in range(4):
477
      self.assertEqual(self.done.get_nowait(), 'ERR')
478
    self.sl = locking.SharedLock(self.sl.name)
479

    
480
  @_Repeat
481
  def testDeletePendingDeleteExclusiveSharers(self):
482
    self.sl.acquire()
483
    self._addThread(target=self._doItDelete)
484
    self._addThread(target=self._doItExclusive)
485
    self._addThread(target=self._doItSharer)
486
    self._addThread(target=self._doItSharer)
487
    self.sl.delete()
488
    self._waitThreads()
489
    # The two threads who were pending return both ERR
490
    self.assertEqual(self.done.get_nowait(), 'ERR')
491
    self.assertEqual(self.done.get_nowait(), 'ERR')
492
    self.assertEqual(self.done.get_nowait(), 'ERR')
493
    self.assertEqual(self.done.get_nowait(), 'ERR')
494
    self.sl = locking.SharedLock(self.sl.name)
495

    
496
  @_Repeat
497
  def testExclusiveAcquireTimeout(self):
498
    for shared in [0, 1]:
499
      on_queue = threading.Event()
500
      release_exclusive = threading.Event()
501

    
502
      def _LockExclusive():
503
        self.sl.acquire(shared=0, test_notify=on_queue.set)
504
        self.done.put("A: start wait")
505
        release_exclusive.wait()
506
        self.done.put("A: end wait")
507
        self.sl.release()
508

    
509
      # Start thread to hold lock in exclusive mode
510
      self._addThread(target=_LockExclusive)
511

    
512
      # Wait for wait to begin
513
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
514

    
515
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
516
      # on the queue
517
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
518
                                      test_notify=release_exclusive.set))
519

    
520
      self.done.put("got 2nd")
521
      self.sl.release()
522

    
523
      self._waitThreads()
524

    
525
      self.assertEqual(self.done.get_nowait(), "A: end wait")
526
      self.assertEqual(self.done.get_nowait(), "got 2nd")
527
      self.assertRaises(Queue.Empty, self.done.get_nowait)
528

    
529
  @_Repeat
530
  def testAcquireExpiringTimeout(self):
531
    def _AcquireWithTimeout(shared, timeout):
532
      if not self.sl.acquire(shared=shared, timeout=timeout):
533
        self.done.put("timeout")
534

    
535
    for shared in [0, 1]:
536
      # Lock exclusively
537
      self.sl.acquire()
538

    
539
      # Start shared acquires with timeout between 0 and 20 ms
540
      for i in range(11):
541
        self._addThread(target=_AcquireWithTimeout,
542
                        args=(shared, i * 2.0 / 1000.0))
543

    
544
      # Wait for threads to finish (makes sure the acquire timeout expires
545
      # before releasing the lock)
546
      self._waitThreads()
547

    
548
      # Release lock
549
      self.sl.release()
550

    
551
      for _ in range(11):
552
        self.assertEqual(self.done.get_nowait(), "timeout")
553

    
554
      self.assertRaises(Queue.Empty, self.done.get_nowait)
555

    
556
  @_Repeat
557
  def testSharedSkipExclusiveAcquires(self):
558
    # Tests whether shared acquires jump in front of exclusive acquires in the
559
    # queue.
560

    
561
    def _Acquire(shared, name, notify_ev, wait_ev):
562
      if notify_ev:
563
        notify_fn = notify_ev.set
564
      else:
565
        notify_fn = None
566

    
567
      if wait_ev:
568
        wait_ev.wait()
569

    
570
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
571
        return
572

    
573
      self.done.put(name)
574
      self.sl.release()
575

    
576
    # Get exclusive lock while we fill the queue
577
    self.sl.acquire()
578

    
579
    shrcnt1 = 5
580
    shrcnt2 = 7
581
    shrcnt3 = 9
582
    shrcnt4 = 2
583

    
584
    # Add acquires using threading.Event for synchronization. They'll be
585
    # acquired exactly in the order defined in this list.
586
    acquires = (shrcnt1 * [(1, "shared 1")] +
587
                3 * [(0, "exclusive 1")] +
588
                shrcnt2 * [(1, "shared 2")] +
589
                shrcnt3 * [(1, "shared 3")] +
590
                shrcnt4 * [(1, "shared 4")] +
591
                3 * [(0, "exclusive 2")])
592

    
593
    ev_cur = None
594
    ev_prev = None
595

    
596
    for args in acquires:
597
      ev_cur = threading.Event()
598
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
599
      ev_prev = ev_cur
600

    
601
    # Wait for last acquire to start
602
    ev_prev.wait()
603

    
604
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
605
    # together
606
    self.assertEqual(self.sl._count_pending(), 7)
607

    
608
    # Release exclusive lock and wait
609
    self.sl.release()
610

    
611
    self._waitThreads()
612

    
613
    # Check sequence
614
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
615
      # Shared locks aren't guaranteed to be notified in order, but they'll be
616
      # first
617
      tmp = self.done.get_nowait()
618
      if tmp == "shared 1":
619
        shrcnt1 -= 1
620
      elif tmp == "shared 2":
621
        shrcnt2 -= 1
622
      elif tmp == "shared 3":
623
        shrcnt3 -= 1
624
      elif tmp == "shared 4":
625
        shrcnt4 -= 1
626
    self.assertEqual(shrcnt1, 0)
627
    self.assertEqual(shrcnt2, 0)
628
    self.assertEqual(shrcnt3, 0)
629
    self.assertEqual(shrcnt3, 0)
630

    
631
    for _ in range(3):
632
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
633

    
634
    for _ in range(3):
635
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
636

    
637
    self.assertRaises(Queue.Empty, self.done.get_nowait)
638

    
639
  def testIllegalDowngrade(self):
640
    # Not yet acquired
641
    self.assertRaises(AssertionError, self.sl.downgrade)
642

    
643
    # Acquire in shared mode, downgrade should be no-op
644
    self.assertTrue(self.sl.acquire(shared=1))
645
    self.assertTrue(self.sl._is_owned(shared=1))
646
    self.assertTrue(self.sl.downgrade())
647
    self.assertTrue(self.sl._is_owned(shared=1))
648
    self.sl.release()
649

    
650
  def testDowngrade(self):
651
    self.assertTrue(self.sl.acquire())
652
    self.assertTrue(self.sl._is_owned(shared=0))
653
    self.assertTrue(self.sl.downgrade())
654
    self.assertTrue(self.sl._is_owned(shared=1))
655
    self.sl.release()
656

    
657
  @_Repeat
658
  def testDowngradeJumpsAheadOfExclusive(self):
659
    def _KeepExclusive(ev_got, ev_downgrade, ev_release):
660
      self.assertTrue(self.sl.acquire())
661
      self.assertTrue(self.sl._is_owned(shared=0))
662
      ev_got.set()
663
      ev_downgrade.wait()
664
      self.assertTrue(self.sl._is_owned(shared=0))
665
      self.assertTrue(self.sl.downgrade())
666
      self.assertTrue(self.sl._is_owned(shared=1))
667
      ev_release.wait()
668
      self.assertTrue(self.sl._is_owned(shared=1))
669
      self.sl.release()
670

    
671
    def _KeepExclusive2(ev_started, ev_release):
672
      self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
673
      self.assertTrue(self.sl._is_owned(shared=0))
674
      ev_release.wait()
675
      self.assertTrue(self.sl._is_owned(shared=0))
676
      self.sl.release()
677

    
678
    def _KeepShared(ev_started, ev_got, ev_release):
679
      self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
680
      self.assertTrue(self.sl._is_owned(shared=1))
681
      ev_got.set()
682
      ev_release.wait()
683
      self.assertTrue(self.sl._is_owned(shared=1))
684
      self.sl.release()
685

    
686
    # Acquire lock in exclusive mode
687
    ev_got_excl1 = threading.Event()
688
    ev_downgrade_excl1 = threading.Event()
689
    ev_release_excl1 = threading.Event()
690
    th_excl1 = self._addThread(target=_KeepExclusive,
691
                               args=(ev_got_excl1, ev_downgrade_excl1,
692
                                     ev_release_excl1))
693
    ev_got_excl1.wait()
694

    
695
    # Start a second exclusive acquire
696
    ev_started_excl2 = threading.Event()
697
    ev_release_excl2 = threading.Event()
698
    th_excl2 = self._addThread(target=_KeepExclusive2,
699
                               args=(ev_started_excl2, ev_release_excl2))
700
    ev_started_excl2.wait()
701

    
702
    # Start shared acquires, will jump ahead of second exclusive acquire when
703
    # first exclusive acquire downgrades
704
    ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
705
    ev_release_shared = threading.Event()
706

    
707
    th_shared = [self._addThread(target=_KeepShared,
708
                                 args=(ev_started, ev_got, ev_release_shared))
709
                 for (ev_started, ev_got) in ev_shared]
710

    
711
    # Wait for all shared acquires to start
712
    for (ev, _) in ev_shared:
713
      ev.wait()
714

    
715
    # Check lock information
716
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
717
                     [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
718
    [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
719
    self.assertEqual([(pendmode, sorted(waiting))
720
                      for (pendmode, waiting) in pending],
721
                     [("exclusive", [th_excl2.getName()]),
722
                      ("shared", sorted(th.getName() for th in th_shared))])
723

    
724
    # Shared acquires won't start until the exclusive lock is downgraded
725
    ev_downgrade_excl1.set()
726

    
727
    # Wait for all shared acquires to be successful
728
    for (_, ev) in ev_shared:
729
      ev.wait()
730

    
731
    # Check lock information again
732
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
733
                                              query.LQ_PENDING])),
734
                     [(self.sl.name, "shared", None,
735
                       [("exclusive", [th_excl2.getName()])])])
736
    [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
737
    self.assertEqual(set(owner), set([th_excl1.getName()] +
738
                                     [th.getName() for th in th_shared]))
739

    
740
    ev_release_excl1.set()
741
    ev_release_excl2.set()
742
    ev_release_shared.set()
743

    
744
    self._waitThreads()
745

    
746
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
747
                                              query.LQ_PENDING])),
748
                     [(self.sl.name, None, None, [])])
749

    
750
  @_Repeat
751
  def testMixedAcquireTimeout(self):
752
    sync = threading.Event()
753

    
754
    def _AcquireShared(ev):
755
      if not self.sl.acquire(shared=1, timeout=None):
756
        return
757

    
758
      self.done.put("shared")
759

    
760
      # Notify main thread
761
      ev.set()
762

    
763
      # Wait for notification from main thread
764
      sync.wait()
765

    
766
      # Release lock
767
      self.sl.release()
768

    
769
    acquires = []
770
    for _ in range(3):
771
      ev = threading.Event()
772
      self._addThread(target=_AcquireShared, args=(ev, ))
773
      acquires.append(ev)
774

    
775
    # Wait for all acquires to finish
776
    for i in acquires:
777
      i.wait()
778

    
779
    self.assertEqual(self.sl._count_pending(), 0)
780

    
781
    # Try to get exclusive lock
782
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
783

    
784
    # Acquire exclusive without timeout
785
    exclsync = threading.Event()
786
    exclev = threading.Event()
787

    
788
    def _AcquireExclusive():
789
      if not self.sl.acquire(shared=0):
790
        return
791

    
792
      self.done.put("exclusive")
793

    
794
      # Notify main thread
795
      exclev.set()
796

    
797
      # Wait for notification from main thread
798
      exclsync.wait()
799

    
800
      self.sl.release()
801

    
802
    self._addThread(target=_AcquireExclusive)
803

    
804
    # Try to get exclusive lock
805
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
806

    
807
    # Make all shared holders release their locks
808
    sync.set()
809

    
810
    # Wait for exclusive acquire to succeed
811
    exclev.wait()
812

    
813
    self.assertEqual(self.sl._count_pending(), 0)
814

    
815
    # Try to get exclusive lock
816
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
817

    
818
    def _AcquireSharedSimple():
819
      if self.sl.acquire(shared=1, timeout=None):
820
        self.done.put("shared2")
821
        self.sl.release()
822

    
823
    for _ in range(10):
824
      self._addThread(target=_AcquireSharedSimple)
825

    
826
    # Tell exclusive lock to release
827
    exclsync.set()
828

    
829
    # Wait for everything to finish
830
    self._waitThreads()
831

    
832
    self.assertEqual(self.sl._count_pending(), 0)
833

    
834
    # Check sequence
835
    for _ in range(3):
836
      self.assertEqual(self.done.get_nowait(), "shared")
837

    
838
    self.assertEqual(self.done.get_nowait(), "exclusive")
839

    
840
    for _ in range(10):
841
      self.assertEqual(self.done.get_nowait(), "shared2")
842

    
843
    self.assertRaises(Queue.Empty, self.done.get_nowait)
844

    
845
  def testPriority(self):
846
    # Acquire in exclusive mode
847
    self.assert_(self.sl.acquire(shared=0))
848

    
849
    # Queue acquires
850
    def _Acquire(prev, next, shared, priority, result):
851
      prev.wait()
852
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
853
      try:
854
        self.done.put(result)
855
      finally:
856
        self.sl.release()
857

    
858
    counter = itertools.count(0)
859
    priorities = range(-20, 30)
860
    first = threading.Event()
861
    prev = first
862

    
863
    # Data structure:
864
    # {
865
    #   priority:
866
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
867
    #      (shared/exclusive, ...),
868
    #      ...,
869
    #     ],
870
    # }
871
    perprio = {}
872

    
873
    # References shared acquire per priority in L{perprio}. Data structure:
874
    # {
875
    #   priority: (shared=1, set(acquire names), set(pending threads)),
876
    # }
877
    prioshared = {}
878

    
879
    for seed in [4979, 9523, 14902, 32440]:
880
      # Use a deterministic random generator
881
      rnd = random.Random(seed)
882
      for priority in [rnd.choice(priorities) for _ in range(30)]:
883
        modes = [0, 1]
884
        rnd.shuffle(modes)
885
        for shared in modes:
886
          # Unique name
887
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
888

    
889
          ev = threading.Event()
890
          thread = self._addThread(target=_Acquire,
891
                                   args=(prev, ev, shared, priority, acqname))
892
          prev = ev
893

    
894
          # Record expected aqcuire, see above for structure
895
          data = (shared, set([acqname]), set([thread]))
896
          priolist = perprio.setdefault(priority, [])
897
          if shared:
898
            priosh = prioshared.get(priority, None)
899
            if priosh:
900
              # Shared acquires are merged
901
              for i, j in zip(priosh[1:], data[1:]):
902
                i.update(j)
903
              assert data[0] == priosh[0]
904
            else:
905
              prioshared[priority] = data
906
              priolist.append(data)
907
          else:
908
            priolist.append(data)
909

    
910
    # Start all acquires and wait for them
911
    first.set()
912
    prev.wait()
913

    
914
    # Check lock information
915
    self.assertEqual(self.sl.GetLockInfo(set()),
916
                     [(self.sl.name, None, None, None)])
917
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
918
                     [(self.sl.name, "exclusive",
919
                       [threading.currentThread().getName()], None)])
920

    
921
    self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
922
                            perprio)
923

    
924
    # Let threads acquire the lock
925
    self.sl.release()
926

    
927
    # Wait for everything to finish
928
    self._waitThreads()
929

    
930
    self.assert_(self.sl._check_empty())
931

    
932
    # Check acquires by priority
933
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
934
      for (_, names, _) in acquires:
935
        # For shared acquires, the set will contain 1..n entries. For exclusive
936
        # acquires only one.
937
        while names:
938
          names.remove(self.done.get_nowait())
939
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
940

    
941
    self.assertRaises(Queue.Empty, self.done.get_nowait)
942

    
943
  def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
944
    self.assertEqual(name, self.sl.name)
945
    self.assert_(mode is None)
946
    self.assert_(owner is None)
947

    
948
    self.assertEqual([(pendmode, sorted(waiting))
949
                      for (pendmode, waiting) in pending],
950
                     [(["exclusive", "shared"][int(bool(shared))],
951
                       sorted(t.getName() for t in threads))
952
                      for acquires in [perprio[i]
953
                                       for i in sorted(perprio.keys())]
954
                      for (shared, _, threads) in acquires])
955

    
956

    
957
class TestSharedLockInCondition(_ThreadedTestCase):
958
  """SharedLock as a condition lock tests"""
959

    
960
  def setUp(self):
961
    _ThreadedTestCase.setUp(self)
962
    self.sl = locking.SharedLock("TestSharedLockInCondition")
963
    self.setCondition()
964

    
965
  def setCondition(self):
966
    self.cond = threading.Condition(self.sl)
967

    
968
  def testKeepMode(self):
969
    self.cond.acquire(shared=1)
970
    self.assert_(self.sl._is_owned(shared=1))
971
    self.cond.wait(0)
972
    self.assert_(self.sl._is_owned(shared=1))
973
    self.cond.release()
974
    self.cond.acquire(shared=0)
975
    self.assert_(self.sl._is_owned(shared=0))
976
    self.cond.wait(0)
977
    self.assert_(self.sl._is_owned(shared=0))
978
    self.cond.release()
979

    
980

    
981
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
982
  """SharedLock as a pipe condition lock tests"""
983

    
984
  def setCondition(self):
985
    self.cond = locking.PipeCondition(self.sl)
986

    
987

    
988
class TestSSynchronizedDecorator(_ThreadedTestCase):
989
  """Shared Lock Synchronized decorator test"""
990

    
991
  def setUp(self):
992
    _ThreadedTestCase.setUp(self)
993

    
994
  @locking.ssynchronized(_decoratorlock)
995
  def _doItExclusive(self):
996
    self.assert_(_decoratorlock._is_owned())
997
    self.done.put('EXC')
998

    
999
  @locking.ssynchronized(_decoratorlock, shared=1)
1000
  def _doItSharer(self):
1001
    self.assert_(_decoratorlock._is_owned(shared=1))
1002
    self.done.put('SHR')
1003

    
1004
  def testDecoratedFunctions(self):
1005
    self._doItExclusive()
1006
    self.assertFalse(_decoratorlock._is_owned())
1007
    self._doItSharer()
1008
    self.assertFalse(_decoratorlock._is_owned())
1009

    
1010
  def testSharersCanCoexist(self):
1011
    _decoratorlock.acquire(shared=1)
1012
    threading.Thread(target=self._doItSharer).start()
1013
    self.assert_(self.done.get(True, 1))
1014
    _decoratorlock.release()
1015

    
1016
  @_Repeat
1017
  def testExclusiveBlocksExclusive(self):
1018
    _decoratorlock.acquire()
1019
    self._addThread(target=self._doItExclusive)
1020
    # give it a bit of time to check that it's not actually doing anything
1021
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1022
    _decoratorlock.release()
1023
    self._waitThreads()
1024
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1025

    
1026
  @_Repeat
1027
  def testExclusiveBlocksSharer(self):
1028
    _decoratorlock.acquire()
1029
    self._addThread(target=self._doItSharer)
1030
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1031
    _decoratorlock.release()
1032
    self._waitThreads()
1033
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1034

    
1035
  @_Repeat
1036
  def testSharerBlocksExclusive(self):
1037
    _decoratorlock.acquire(shared=1)
1038
    self._addThread(target=self._doItExclusive)
1039
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1040
    _decoratorlock.release()
1041
    self._waitThreads()
1042
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1043

    
1044

    
1045
class TestLockSet(_ThreadedTestCase):
1046
  """LockSet tests"""
1047

    
1048
  def setUp(self):
1049
    _ThreadedTestCase.setUp(self)
1050
    self._setUpLS()
1051

    
1052
  def _setUpLS(self):
1053
    """Helper to (re)initialize the lock set"""
1054
    self.resources = ['one', 'two', 'three']
1055
    self.ls = locking.LockSet(self.resources, "TestLockSet")
1056

    
1057
  def testResources(self):
1058
    self.assertEquals(self.ls._names(), set(self.resources))
1059
    newls = locking.LockSet([], "TestLockSet.testResources")
1060
    self.assertEquals(newls._names(), set())
1061

    
1062
  def testAcquireRelease(self):
1063
    self.assert_(self.ls.acquire('one'))
1064
    self.assertEquals(self.ls._list_owned(), set(['one']))
1065
    self.ls.release()
1066
    self.assertEquals(self.ls._list_owned(), set())
1067
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
1068
    self.assertEquals(self.ls._list_owned(), set(['one']))
1069
    self.ls.release()
1070
    self.assertEquals(self.ls._list_owned(), set())
1071
    self.ls.acquire(['one', 'two', 'three'])
1072
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1073
    self.ls.release('one')
1074
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
1075
    self.ls.release(['three'])
1076
    self.assertEquals(self.ls._list_owned(), set(['two']))
1077
    self.ls.release()
1078
    self.assertEquals(self.ls._list_owned(), set())
1079
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1080
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
1081
    self.ls.release()
1082
    self.assertEquals(self.ls._list_owned(), set())
1083

    
1084
  def testNoDoubleAcquire(self):
1085
    self.ls.acquire('one')
1086
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
1087
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1088
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1089
    self.ls.release()
1090
    self.ls.acquire(['one', 'three'])
1091
    self.ls.release('one')
1092
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1093
    self.ls.release('three')
1094

    
1095
  def testNoWrongRelease(self):
1096
    self.assertRaises(AssertionError, self.ls.release)
1097
    self.ls.acquire('one')
1098
    self.assertRaises(AssertionError, self.ls.release, 'two')
1099

    
1100
  def testAddRemove(self):
1101
    self.ls.add('four')
1102
    self.assertEquals(self.ls._list_owned(), set())
1103
    self.assert_('four' in self.ls._names())
1104
    self.ls.add(['five', 'six', 'seven'], acquired=1)
1105
    self.assert_('five' in self.ls._names())
1106
    self.assert_('six' in self.ls._names())
1107
    self.assert_('seven' in self.ls._names())
1108
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
1109
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1110
    self.assert_('five' not in self.ls._names())
1111
    self.assert_('six' not in self.ls._names())
1112
    self.assertEquals(self.ls._list_owned(), set(['seven']))
1113
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1114
    self.ls.remove('seven')
1115
    self.assert_('seven' not in self.ls._names())
1116
    self.assertEquals(self.ls._list_owned(), set([]))
1117
    self.ls.acquire(None, shared=1)
1118
    self.assertRaises(AssertionError, self.ls.add, 'eight')
1119
    self.ls.release()
1120
    self.ls.acquire(None)
1121
    self.ls.add('eight', acquired=1)
1122
    self.assert_('eight' in self.ls._names())
1123
    self.assert_('eight' in self.ls._list_owned())
1124
    self.ls.add('nine')
1125
    self.assert_('nine' in self.ls._names())
1126
    self.assert_('nine' not in self.ls._list_owned())
1127
    self.ls.release()
1128
    self.ls.remove(['two'])
1129
    self.assert_('two' not in self.ls._names())
1130
    self.ls.acquire('three')
1131
    self.assertEquals(self.ls.remove(['three']), ['three'])
1132
    self.assert_('three' not in self.ls._names())
1133
    self.assertEquals(self.ls.remove('three'), [])
1134
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1135
    self.assert_('one' not in self.ls._names())
1136

    
1137
  def testRemoveNonBlocking(self):
1138
    self.ls.acquire('one')
1139
    self.assertEquals(self.ls.remove('one'), ['one'])
1140
    self.ls.acquire(['two', 'three'])
1141
    self.assertEquals(self.ls.remove(['two', 'three']),
1142
                      ['two', 'three'])
1143

    
1144
  def testNoDoubleAdd(self):
1145
    self.assertRaises(errors.LockError, self.ls.add, 'two')
1146
    self.ls.add('four')
1147
    self.assertRaises(errors.LockError, self.ls.add, 'four')
1148

    
1149
  def testNoWrongRemoves(self):
1150
    self.ls.acquire(['one', 'three'], shared=1)
1151
    # Cannot remove 'two' while holding something which is not a superset
1152
    self.assertRaises(AssertionError, self.ls.remove, 'two')
1153
    # Cannot remove 'three' as we are sharing it
1154
    self.assertRaises(AssertionError, self.ls.remove, 'three')
1155

    
1156
  def testAcquireSetLock(self):
1157
    # acquire the set-lock exclusively
1158
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1159
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1160
    self.assertEquals(self.ls._is_owned(), True)
1161
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1162
    # I can still add/remove elements...
1163
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1164
    self.assert_(self.ls.add('six'))
1165
    self.ls.release()
1166
    # share the set-lock
1167
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1168
    # adding new elements is not possible
1169
    self.assertRaises(AssertionError, self.ls.add, 'five')
1170
    self.ls.release()
1171

    
1172
  def testAcquireWithRepetitions(self):
1173
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1174
                      set(['two', 'two', 'three']))
1175
    self.ls.release(['two', 'two'])
1176
    self.assertEquals(self.ls._list_owned(), set(['three']))
1177

    
1178
  def testEmptyAcquire(self):
1179
    # Acquire an empty list of locks...
1180
    self.assertEquals(self.ls.acquire([]), set())
1181
    self.assertEquals(self.ls._list_owned(), set())
1182
    # New locks can still be addded
1183
    self.assert_(self.ls.add('six'))
1184
    # "re-acquiring" is not an issue, since we had really acquired nothing
1185
    self.assertEquals(self.ls.acquire([], shared=1), set())
1186
    self.assertEquals(self.ls._list_owned(), set())
1187
    # We haven't really acquired anything, so we cannot release
1188
    self.assertRaises(AssertionError, self.ls.release)
1189

    
1190
  def _doLockSet(self, names, shared):
1191
    try:
1192
      self.ls.acquire(names, shared=shared)
1193
      self.done.put('DONE')
1194
      self.ls.release()
1195
    except errors.LockError:
1196
      self.done.put('ERR')
1197

    
1198
  def _doAddSet(self, names):
1199
    try:
1200
      self.ls.add(names, acquired=1)
1201
      self.done.put('DONE')
1202
      self.ls.release()
1203
    except errors.LockError:
1204
      self.done.put('ERR')
1205

    
1206
  def _doRemoveSet(self, names):
1207
    self.done.put(self.ls.remove(names))
1208

    
1209
  @_Repeat
1210
  def testConcurrentSharedAcquire(self):
1211
    self.ls.acquire(['one', 'two'], shared=1)
1212
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1213
    self._waitThreads()
1214
    self.assertEqual(self.done.get_nowait(), 'DONE')
1215
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1216
    self._waitThreads()
1217
    self.assertEqual(self.done.get_nowait(), 'DONE')
1218
    self._addThread(target=self._doLockSet, args=('three', 1))
1219
    self._waitThreads()
1220
    self.assertEqual(self.done.get_nowait(), 'DONE')
1221
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1222
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1223
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1224
    self.ls.release()
1225
    self._waitThreads()
1226
    self.assertEqual(self.done.get_nowait(), 'DONE')
1227
    self.assertEqual(self.done.get_nowait(), 'DONE')
1228

    
1229
  @_Repeat
1230
  def testConcurrentExclusiveAcquire(self):
1231
    self.ls.acquire(['one', 'two'])
1232
    self._addThread(target=self._doLockSet, args=('three', 1))
1233
    self._waitThreads()
1234
    self.assertEqual(self.done.get_nowait(), 'DONE')
1235
    self._addThread(target=self._doLockSet, args=('three', 0))
1236
    self._waitThreads()
1237
    self.assertEqual(self.done.get_nowait(), 'DONE')
1238
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1239
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1240
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1241
    self._addThread(target=self._doLockSet, args=('one', 0))
1242
    self._addThread(target=self._doLockSet, args=('one', 1))
1243
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1244
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1245
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1246
    self.ls.release()
1247
    self._waitThreads()
1248
    for _ in range(6):
1249
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1250

    
1251
  @_Repeat
1252
  def testSimpleAcquireTimeoutExpiring(self):
1253
    names = sorted(self.ls._names())
1254
    self.assert_(len(names) >= 3)
1255

    
1256
    # Get name of first lock
1257
    first = names[0]
1258

    
1259
    # Get name of last lock
1260
    last = names.pop()
1261

    
1262
    checks = [
1263
      # Block first and try to lock it again
1264
      (first, first),
1265

    
1266
      # Block last and try to lock all locks
1267
      (None, first),
1268

    
1269
      # Block last and try to lock it again
1270
      (last, last),
1271
      ]
1272

    
1273
    for (wanted, block) in checks:
1274
      # Lock in exclusive mode
1275
      self.assert_(self.ls.acquire(block, shared=0))
1276

    
1277
      def _AcquireOne():
1278
        # Try to get the same lock again with a timeout (should never succeed)
1279
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1280
        if acquired:
1281
          self.done.put("acquired")
1282
          self.ls.release()
1283
        else:
1284
          self.assert_(acquired is None)
1285
          self.assertFalse(self.ls._list_owned())
1286
          self.assertFalse(self.ls._is_owned())
1287
          self.done.put("not acquired")
1288

    
1289
      self._addThread(target=_AcquireOne)
1290

    
1291
      # Wait for timeout in thread to expire
1292
      self._waitThreads()
1293

    
1294
      # Release exclusive lock again
1295
      self.ls.release()
1296

    
1297
      self.assertEqual(self.done.get_nowait(), "not acquired")
1298
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1299

    
1300
  @_Repeat
1301
  def testDelayedAndExpiringLockAcquire(self):
1302
    self._setUpLS()
1303
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1304

    
1305
    for expire in (False, True):
1306
      names = sorted(self.ls._names())
1307
      self.assertEqual(len(names), 8)
1308

    
1309
      lock_ev = dict([(i, threading.Event()) for i in names])
1310

    
1311
      # Lock all in exclusive mode
1312
      self.assert_(self.ls.acquire(names, shared=0))
1313

    
1314
      if expire:
1315
        # We'll wait at least 300ms per lock
1316
        lockwait = len(names) * [0.3]
1317

    
1318
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1319
        # this gives us up to 2.4s to fail.
1320
        lockall_timeout = 0.4
1321
      else:
1322
        # This should finish rather quickly
1323
        lockwait = None
1324
        lockall_timeout = len(names) * 5.0
1325

    
1326
      def _LockAll():
1327
        def acquire_notification(name):
1328
          if not expire:
1329
            self.done.put("getting %s" % name)
1330

    
1331
          # Kick next lock
1332
          lock_ev[name].set()
1333

    
1334
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1335
                           test_notify=acquire_notification):
1336
          self.done.put("got all")
1337
          self.ls.release()
1338
        else:
1339
          self.done.put("timeout on all")
1340

    
1341
        # Notify all locks
1342
        for ev in lock_ev.values():
1343
          ev.set()
1344

    
1345
      t = self._addThread(target=_LockAll)
1346

    
1347
      for idx, name in enumerate(names):
1348
        # Wait for actual acquire on this lock to start
1349
        lock_ev[name].wait(10.0)
1350

    
1351
        if expire and t.isAlive():
1352
          # Wait some time after getting the notification to make sure the lock
1353
          # acquire will expire
1354
          SafeSleep(lockwait[idx])
1355

    
1356
        self.ls.release(names=name)
1357

    
1358
      self.assertFalse(self.ls._list_owned())
1359

    
1360
      self._waitThreads()
1361

    
1362
      if expire:
1363
        # Not checking which locks were actually acquired. Doing so would be
1364
        # too timing-dependant.
1365
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1366
      else:
1367
        for i in names:
1368
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1369
        self.assertEqual(self.done.get_nowait(), "got all")
1370
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1371

    
1372
  @_Repeat
1373
  def testConcurrentRemove(self):
1374
    self.ls.add('four')
1375
    self.ls.acquire(['one', 'two', 'four'])
1376
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1377
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1378
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1379
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1380
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1381
    self.ls.remove('one')
1382
    self.ls.release()
1383
    self._waitThreads()
1384
    for i in range(4):
1385
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1386
    self.ls.add(['five', 'six'], acquired=1)
1387
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1388
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1389
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1390
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1391
    self.ls.remove('five')
1392
    self.ls.release()
1393
    self._waitThreads()
1394
    for i in range(4):
1395
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1396
    self.ls.acquire(['three', 'four'])
1397
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1398
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1399
    self.ls.remove('four')
1400
    self._waitThreads()
1401
    self.assertEqual(self.done.get_nowait(), ['six'])
1402
    self._addThread(target=self._doRemoveSet, args=(['two']))
1403
    self._waitThreads()
1404
    self.assertEqual(self.done.get_nowait(), ['two'])
1405
    self.ls.release()
1406
    # reset lockset
1407
    self._setUpLS()
1408

    
1409
  @_Repeat
1410
  def testConcurrentSharedSetLock(self):
1411
    # share the set-lock...
1412
    self.ls.acquire(None, shared=1)
1413
    # ...another thread can share it too
1414
    self._addThread(target=self._doLockSet, args=(None, 1))
1415
    self._waitThreads()
1416
    self.assertEqual(self.done.get_nowait(), 'DONE')
1417
    # ...or just share some elements
1418
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1419
    self._waitThreads()
1420
    self.assertEqual(self.done.get_nowait(), 'DONE')
1421
    # ...but not add new ones or remove any
1422
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1423
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1424
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1425
    # this just releases the set-lock
1426
    self.ls.release([])
1427
    t.join(60)
1428
    self.assertEqual(self.done.get_nowait(), 'DONE')
1429
    # release the lock on the actual elements so remove() can proceed too
1430
    self.ls.release()
1431
    self._waitThreads()
1432
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1433
    # reset lockset
1434
    self._setUpLS()
1435

    
1436
  @_Repeat
1437
  def testConcurrentExclusiveSetLock(self):
1438
    # acquire the set-lock...
1439
    self.ls.acquire(None, shared=0)
1440
    # ...no one can do anything else
1441
    self._addThread(target=self._doLockSet, args=(None, 1))
1442
    self._addThread(target=self._doLockSet, args=(None, 0))
1443
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1444
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1445
    self._addThread(target=self._doAddSet, args=(['nine']))
1446
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1447
    self.ls.release()
1448
    self._waitThreads()
1449
    for _ in range(5):
1450
      self.assertEqual(self.done.get(True, 1), 'DONE')
1451
    # cleanup
1452
    self._setUpLS()
1453

    
1454
  @_Repeat
1455
  def testConcurrentSetLockAdd(self):
1456
    self.ls.acquire('one')
1457
    # Another thread wants the whole SetLock
1458
    self._addThread(target=self._doLockSet, args=(None, 0))
1459
    self._addThread(target=self._doLockSet, args=(None, 1))
1460
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1461
    self.assertRaises(AssertionError, self.ls.add, 'four')
1462
    self.ls.release()
1463
    self._waitThreads()
1464
    self.assertEqual(self.done.get_nowait(), 'DONE')
1465
    self.assertEqual(self.done.get_nowait(), 'DONE')
1466
    self.ls.acquire(None)
1467
    self._addThread(target=self._doLockSet, args=(None, 0))
1468
    self._addThread(target=self._doLockSet, args=(None, 1))
1469
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1470
    self.ls.add('four')
1471
    self.ls.add('five', acquired=1)
1472
    self.ls.add('six', acquired=1, shared=1)
1473
    self.assertEquals(self.ls._list_owned(),
1474
      set(['one', 'two', 'three', 'five', 'six']))
1475
    self.assertEquals(self.ls._is_owned(), True)
1476
    self.assertEquals(self.ls._names(),
1477
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1478
    self.ls.release()
1479
    self._waitThreads()
1480
    self.assertEqual(self.done.get_nowait(), 'DONE')
1481
    self.assertEqual(self.done.get_nowait(), 'DONE')
1482
    self._setUpLS()
1483

    
1484
  @_Repeat
1485
  def testEmptyLockSet(self):
1486
    # get the set-lock
1487
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1488
    # now empty it...
1489
    self.ls.remove(['one', 'two', 'three'])
1490
    # and adds/locks by another thread still wait
1491
    self._addThread(target=self._doAddSet, args=(['nine']))
1492
    self._addThread(target=self._doLockSet, args=(None, 1))
1493
    self._addThread(target=self._doLockSet, args=(None, 0))
1494
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1495
    self.ls.release()
1496
    self._waitThreads()
1497
    for _ in range(3):
1498
      self.assertEqual(self.done.get_nowait(), 'DONE')
1499
    # empty it again...
1500
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1501
    # now share it...
1502
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1503
    # other sharers can go, adds still wait
1504
    self._addThread(target=self._doLockSet, args=(None, 1))
1505
    self._waitThreads()
1506
    self.assertEqual(self.done.get_nowait(), 'DONE')
1507
    self._addThread(target=self._doAddSet, args=(['nine']))
1508
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1509
    self.ls.release()
1510
    self._waitThreads()
1511
    self.assertEqual(self.done.get_nowait(), 'DONE')
1512
    self._setUpLS()
1513

    
1514
  def testAcquireWithNamesDowngrade(self):
1515
    self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1516
    self.assertTrue(self.ls._is_owned())
1517
    self.assertFalse(self.ls._get_lock()._is_owned())
1518
    self.ls.release()
1519
    self.assertFalse(self.ls._is_owned())
1520
    self.assertFalse(self.ls._get_lock()._is_owned())
1521
    # Can't downgrade after releasing
1522
    self.assertRaises(AssertionError, self.ls.downgrade, "two")
1523

    
1524
  def testDowngrade(self):
1525
    # Not owning anything, must raise an exception
1526
    self.assertFalse(self.ls._is_owned())
1527
    self.assertRaises(AssertionError, self.ls.downgrade)
1528

    
1529
    self.assertFalse(compat.any(i._is_owned()
1530
                                for i in self.ls._get_lockdict().values()))
1531

    
1532
    self.assertEquals(self.ls.acquire(None, shared=0),
1533
                      set(["one", "two", "three"]))
1534
    self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1535

    
1536
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1537
    self.assertTrue(compat.all(i._is_owned(shared=0)
1538
                               for i in self.ls._get_lockdict().values()))
1539

    
1540
    # Start downgrading locks
1541
    self.assertTrue(self.ls.downgrade(names=["one"]))
1542
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1543
    self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")])
1544
                               for name, lock in
1545
                                 self.ls._get_lockdict().items()))
1546

    
1547
    self.assertTrue(self.ls.downgrade(names="two"))
1548
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1549
    should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1550
    self.assertTrue(compat.all(lock._is_owned(shared=should_share(name))
1551
                               for name, lock in
1552
                                 self.ls._get_lockdict().items()))
1553

    
1554
    # Downgrading the last exclusive lock to shared must downgrade the
1555
    # lockset-internal lock too
1556
    self.assertTrue(self.ls.downgrade(names="three"))
1557
    self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1558
    self.assertTrue(compat.all(i._is_owned(shared=1)
1559
                               for i in self.ls._get_lockdict().values()))
1560

    
1561
    # Downgrading a shared lock must be a no-op
1562
    self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1563
    self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1564
    self.assertTrue(compat.all(i._is_owned(shared=1)
1565
                               for i in self.ls._get_lockdict().values()))
1566

    
1567
    self.ls.release()
1568

    
1569
  def testPriority(self):
1570
    def _Acquire(prev, next, name, priority, success_fn):
1571
      prev.wait()
1572
      self.assert_(self.ls.acquire(name, shared=0,
1573
                                   priority=priority,
1574
                                   test_notify=lambda _: next.set()))
1575
      try:
1576
        success_fn()
1577
      finally:
1578
        self.ls.release()
1579

    
1580
    # Get all in exclusive mode
1581
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1582

    
1583
    done_two = Queue.Queue(0)
1584

    
1585
    first = threading.Event()
1586
    prev = first
1587

    
1588
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1589
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1590

    
1591
    # Use a deterministic random generator
1592
    random.Random(741).shuffle(acquires)
1593

    
1594
    for (name, prio, done) in acquires:
1595
      ev = threading.Event()
1596
      self._addThread(target=_Acquire,
1597
                      args=(prev, ev, name, prio,
1598
                            compat.partial(done.put, "Prio%s" % prio)))
1599
      prev = ev
1600

    
1601
    # Start acquires
1602
    first.set()
1603

    
1604
    # Wait for last acquire to start
1605
    prev.wait()
1606

    
1607
    # Let threads acquire locks
1608
    self.ls.release()
1609

    
1610
    # Wait for threads to finish
1611
    self._waitThreads()
1612

    
1613
    for i in range(1, 33):
1614
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1615
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1616

    
1617
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1618
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1619

    
1620

    
1621
class TestGanetiLockManager(_ThreadedTestCase):
1622

    
1623
  def setUp(self):
1624
    _ThreadedTestCase.setUp(self)
1625
    self.nodes=['n1', 'n2']
1626
    self.nodegroups=['g1', 'g2']
1627
    self.instances=['i1', 'i2', 'i3']
1628
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1629
                                        self.instances)
1630

    
1631
  def tearDown(self):
1632
    # Don't try this at home...
1633
    locking.GanetiLockManager._instance = None
1634

    
1635
  def testLockingConstants(self):
1636
    # The locking library internally cheats by assuming its constants have some
1637
    # relationships with each other. Check those hold true.
1638
    # This relationship is also used in the Processor to recursively acquire
1639
    # the right locks. Again, please don't break it.
1640
    for i in range(len(locking.LEVELS)):
1641
      self.assertEqual(i, locking.LEVELS[i])
1642

    
1643
  def testDoubleGLFails(self):
1644
    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1645

    
1646
  def testLockNames(self):
1647
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1648
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1649
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1650
                     set(self.nodegroups))
1651
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1652
                     set(self.instances))
1653

    
1654
  def testInitAndResources(self):
1655
    locking.GanetiLockManager._instance = None
1656
    self.GL = locking.GanetiLockManager([], [], [])
1657
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1658
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1659
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1660
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1661

    
1662
    locking.GanetiLockManager._instance = None
1663
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1664
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1665
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1666
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1667
                                    set(self.nodegroups))
1668
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1669

    
1670
    locking.GanetiLockManager._instance = None
1671
    self.GL = locking.GanetiLockManager([], [], self.instances)
1672
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1673
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1674
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1675
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1676
                     set(self.instances))
1677

    
1678
  def testAcquireRelease(self):
1679
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1680
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1681
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1682
    self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1683
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1684
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1685
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1686
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1687
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1688
    self.GL.release(locking.LEVEL_NODE)
1689
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1690
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1691
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1692
    self.GL.release(locking.LEVEL_NODEGROUP)
1693
    self.GL.release(locking.LEVEL_INSTANCE)
1694
    self.assertRaises(errors.LockError, self.GL.acquire,
1695
                      locking.LEVEL_INSTANCE, ['i5'])
1696
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1697
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1698

    
1699
  def testAcquireWholeSets(self):
1700
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1701
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1702
                      set(self.instances))
1703
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1704
                      set(self.instances))
1705
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1706
                      set(self.nodegroups))
1707
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1708
                      set(self.nodegroups))
1709
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1710
                      set(self.nodes))
1711
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1712
                      set(self.nodes))
1713
    self.GL.release(locking.LEVEL_NODE)
1714
    self.GL.release(locking.LEVEL_NODEGROUP)
1715
    self.GL.release(locking.LEVEL_INSTANCE)
1716
    self.GL.release(locking.LEVEL_CLUSTER)
1717

    
1718
  def testAcquireWholeAndPartial(self):
1719
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1720
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1721
                      set(self.instances))
1722
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1723
                      set(self.instances))
1724
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1725
                      set(['n2']))
1726
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1727
                      set(['n2']))
1728
    self.GL.release(locking.LEVEL_NODE)
1729
    self.GL.release(locking.LEVEL_INSTANCE)
1730
    self.GL.release(locking.LEVEL_CLUSTER)
1731

    
1732
  def testBGLDependency(self):
1733
    self.assertRaises(AssertionError, self.GL.acquire,
1734
                      locking.LEVEL_NODE, ['n1', 'n2'])
1735
    self.assertRaises(AssertionError, self.GL.acquire,
1736
                      locking.LEVEL_INSTANCE, ['i3'])
1737
    self.assertRaises(AssertionError, self.GL.acquire,
1738
                      locking.LEVEL_NODEGROUP, ['g1'])
1739
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1740
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1741
    self.assertRaises(AssertionError, self.GL.release,
1742
                      locking.LEVEL_CLUSTER, ['BGL'])
1743
    self.assertRaises(AssertionError, self.GL.release,
1744
                      locking.LEVEL_CLUSTER)
1745
    self.GL.release(locking.LEVEL_NODE)
1746
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1747
    self.assertRaises(AssertionError, self.GL.release,
1748
                      locking.LEVEL_CLUSTER, ['BGL'])
1749
    self.assertRaises(AssertionError, self.GL.release,
1750
                      locking.LEVEL_CLUSTER)
1751
    self.GL.release(locking.LEVEL_INSTANCE)
1752
    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1753
    self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1754
    self.assertRaises(AssertionError, self.GL.release,
1755
                      locking.LEVEL_CLUSTER, ['BGL'])
1756
    self.assertRaises(AssertionError, self.GL.release,
1757
                      locking.LEVEL_CLUSTER)
1758
    self.GL.release(locking.LEVEL_NODEGROUP)
1759
    self.GL.release(locking.LEVEL_CLUSTER)
1760

    
1761
  def testWrongOrder(self):
1762
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1763
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1764
    self.assertRaises(AssertionError, self.GL.acquire,
1765
                      locking.LEVEL_NODE, ['n1'])
1766
    self.assertRaises(AssertionError, self.GL.acquire,
1767
                      locking.LEVEL_NODEGROUP, ['g1'])
1768
    self.assertRaises(AssertionError, self.GL.acquire,
1769
                      locking.LEVEL_INSTANCE, ['i2'])
1770

    
1771
  def testModifiableLevels(self):
1772
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1773
                      ['BGL2'])
1774
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1775
    self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1776
    self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1777
    self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1778
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1779
    self.GL.add(locking.LEVEL_NODE, ['n3'])
1780
    self.GL.remove(locking.LEVEL_NODE, ['n1'])
1781
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1782
    self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1783
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1784
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1785
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1786
    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1787
                      ['BGL2'])
1788

    
1789
  # Helper function to run as a thread that shared the BGL and then acquires
1790
  # some locks at another level.
1791
  def _doLock(self, level, names, shared):
1792
    try:
1793
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1794
      self.GL.acquire(level, names, shared=shared)
1795
      self.done.put('DONE')
1796
      self.GL.release(level)
1797
      self.GL.release(locking.LEVEL_CLUSTER)
1798
    except errors.LockError:
1799
      self.done.put('ERR')
1800

    
1801
  @_Repeat
1802
  def testConcurrency(self):
1803
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1804
    self._addThread(target=self._doLock,
1805
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1806
    self._waitThreads()
1807
    self.assertEqual(self.done.get_nowait(), 'DONE')
1808
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1809
    self._addThread(target=self._doLock,
1810
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1811
    self._waitThreads()
1812
    self.assertEqual(self.done.get_nowait(), 'DONE')
1813
    self._addThread(target=self._doLock,
1814
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1815
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1816
    self.GL.release(locking.LEVEL_INSTANCE)
1817
    self._waitThreads()
1818
    self.assertEqual(self.done.get_nowait(), 'DONE')
1819
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1820
    self._addThread(target=self._doLock,
1821
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1822
    self._waitThreads()
1823
    self.assertEqual(self.done.get_nowait(), 'DONE')
1824
    self._addThread(target=self._doLock,
1825
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1826
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1827
    self.GL.release(locking.LEVEL_INSTANCE)
1828
    self._waitThreads()
1829
    self.assertEqual(self.done.get(True, 1), 'DONE')
1830
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1831

    
1832

    
1833
class TestLockMonitor(_ThreadedTestCase):
1834
  def setUp(self):
1835
    _ThreadedTestCase.setUp(self)
1836
    self.lm = locking.LockMonitor()
1837

    
1838
  def testSingleThread(self):
1839
    locks = []
1840

    
1841
    for i in range(100):
1842
      name = "TestLock%s" % i
1843
      locks.append(locking.SharedLock(name, monitor=self.lm))
1844

    
1845
    self.assertEqual(len(self.lm._locks), len(locks))
1846
    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1847
    self.assertEqual(len(result.fields), 1)
1848
    self.assertEqual(len(result.data), 100)
1849

    
1850
    # Delete all locks
1851
    del locks[:]
1852

    
1853
    # The garbage collector might needs some time
1854
    def _CheckLocks():
1855
      if self.lm._locks:
1856
        raise utils.RetryAgain()
1857

    
1858
    utils.Retry(_CheckLocks, 0.1, 30.0)
1859

    
1860
    self.assertFalse(self.lm._locks)
1861

    
1862
  def testMultiThread(self):
1863
    locks = []
1864

    
1865
    def _CreateLock(prev, next, name):
1866
      prev.wait()
1867
      locks.append(locking.SharedLock(name, monitor=self.lm))
1868
      if next:
1869
        next.set()
1870

    
1871
    expnames = []
1872

    
1873
    first = threading.Event()
1874
    prev = first
1875

    
1876
    # Use a deterministic random generator
1877
    for i in random.Random(4263).sample(range(100), 33):
1878
      name = "MtTestLock%s" % i
1879
      expnames.append(name)
1880

    
1881
      ev = threading.Event()
1882
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1883
      prev = ev
1884

    
1885
    # Add locks
1886
    first.set()
1887
    self._waitThreads()
1888

    
1889
    # Check order in which locks were added
1890
    self.assertEqual([i.name for i in locks], expnames)
1891

    
1892
    # Check query result
1893
    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1894
    self.assert_(isinstance(result, dict))
1895
    response = objects.QueryResponse.FromDict(result)
1896
    self.assertEqual(response.data,
1897
                     [[(constants.RS_NORMAL, name),
1898
                       (constants.RS_NORMAL, None),
1899
                       (constants.RS_NORMAL, None),
1900
                       (constants.RS_NORMAL, [])]
1901
                      for name in utils.NiceSort(expnames)])
1902
    self.assertEqual(len(response.fields), 4)
1903
    self.assertEqual(["name", "mode", "owner", "pending"],
1904
                     [fdef.name for fdef in response.fields])
1905

    
1906
    # Test exclusive acquire
1907
    for tlock in locks[::4]:
1908
      tlock.acquire(shared=0)
1909
      try:
1910
        def _GetExpResult(name):
1911
          if tlock.name == name:
1912
            return [(constants.RS_NORMAL, name),
1913
                    (constants.RS_NORMAL, "exclusive"),
1914
                    (constants.RS_NORMAL,
1915
                     [threading.currentThread().getName()]),
1916
                    (constants.RS_NORMAL, [])]
1917
          return [(constants.RS_NORMAL, name),
1918
                  (constants.RS_NORMAL, None),
1919
                  (constants.RS_NORMAL, None),
1920
                  (constants.RS_NORMAL, [])]
1921

    
1922
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1923
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1924
                         [_GetExpResult(name)
1925
                          for name in utils.NiceSort(expnames)])
1926
      finally:
1927
        tlock.release()
1928

    
1929
    # Test shared acquire
1930
    def _Acquire(lock, shared, ev, notify):
1931
      lock.acquire(shared=shared)
1932
      try:
1933
        notify.set()
1934
        ev.wait()
1935
      finally:
1936
        lock.release()
1937

    
1938
    for tlock1 in locks[::11]:
1939
      for tlock2 in locks[::-15]:
1940
        if tlock2 == tlock1:
1941
          # Avoid deadlocks
1942
          continue
1943

    
1944
        for tlock3 in locks[::10]:
1945
          if tlock3 in (tlock2, tlock1):
1946
            # Avoid deadlocks
1947
            continue
1948

    
1949
          releaseev = threading.Event()
1950

    
1951
          # Acquire locks
1952
          acquireev = []
1953
          tthreads1 = []
1954
          for i in range(3):
1955
            ev = threading.Event()
1956
            tthreads1.append(self._addThread(target=_Acquire,
1957
                                             args=(tlock1, 1, releaseev, ev)))
1958
            acquireev.append(ev)
1959

    
1960
          ev = threading.Event()
1961
          tthread2 = self._addThread(target=_Acquire,
1962
                                     args=(tlock2, 1, releaseev, ev))
1963
          acquireev.append(ev)
1964

    
1965
          ev = threading.Event()
1966
          tthread3 = self._addThread(target=_Acquire,
1967
                                     args=(tlock3, 0, releaseev, ev))
1968
          acquireev.append(ev)
1969

    
1970
          # Wait for all locks to be acquired
1971
          for i in acquireev:
1972
            i.wait()
1973

    
1974
          # Check query result
1975
          result = self.lm.QueryLocks(["name", "mode", "owner"])
1976
          response = objects.QueryResponse.FromDict(result)
1977
          for (name, mode, owner) in response.data:
1978
            (name_status, name_value) = name
1979
            (owner_status, owner_value) = owner
1980

    
1981
            self.assertEqual(name_status, constants.RS_NORMAL)
1982
            self.assertEqual(owner_status, constants.RS_NORMAL)
1983

    
1984
            if name_value == tlock1.name:
1985
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1986
              self.assertEqual(set(owner_value),
1987
                               set(i.getName() for i in tthreads1))
1988
              continue
1989

    
1990
            if name_value == tlock2.name:
1991
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1992
              self.assertEqual(owner_value, [tthread2.getName()])
1993
              continue
1994

    
1995
            if name_value == tlock3.name:
1996
              self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1997
              self.assertEqual(owner_value, [tthread3.getName()])
1998
              continue
1999

    
2000
            self.assert_(name_value in expnames)
2001
            self.assertEqual(mode, (constants.RS_NORMAL, None))
2002
            self.assert_(owner_value is None)
2003

    
2004
          # Release locks again
2005
          releaseev.set()
2006

    
2007
          self._waitThreads()
2008

    
2009
          result = self.lm.QueryLocks(["name", "mode", "owner"])
2010
          self.assertEqual(objects.QueryResponse.FromDict(result).data,
2011
                           [[(constants.RS_NORMAL, name),
2012
                             (constants.RS_NORMAL, None),
2013
                             (constants.RS_NORMAL, None)]
2014
                            for name in utils.NiceSort(expnames)])
2015

    
2016
  def testDelete(self):
2017
    lock = locking.SharedLock("TestLock", monitor=self.lm)
2018

    
2019
    self.assertEqual(len(self.lm._locks), 1)
2020
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2021
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2022
                     [[(constants.RS_NORMAL, lock.name),
2023
                       (constants.RS_NORMAL, None),
2024
                       (constants.RS_NORMAL, None)]])
2025

    
2026
    lock.delete()
2027

    
2028
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2029
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2030
                     [[(constants.RS_NORMAL, lock.name),
2031
                       (constants.RS_NORMAL, "deleted"),
2032
                       (constants.RS_NORMAL, None)]])
2033
    self.assertEqual(len(self.lm._locks), 1)
2034

    
2035
  def testPending(self):
2036
    def _Acquire(lock, shared, prev, next):
2037
      prev.wait()
2038

    
2039
      lock.acquire(shared=shared, test_notify=next.set)
2040
      try:
2041
        pass
2042
      finally:
2043
        lock.release()
2044

    
2045
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
2046

    
2047
    for shared in [0, 1]:
2048
      lock.acquire()
2049
      try:
2050
        self.assertEqual(len(self.lm._locks), 1)
2051
        result = self.lm.QueryLocks(["name", "mode", "owner"])
2052
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2053
                         [[(constants.RS_NORMAL, lock.name),
2054
                           (constants.RS_NORMAL, "exclusive"),
2055
                           (constants.RS_NORMAL,
2056
                            [threading.currentThread().getName()])]])
2057

    
2058
        threads = []
2059

    
2060
        first = threading.Event()
2061
        prev = first
2062

    
2063
        for i in range(5):
2064
          ev = threading.Event()
2065
          threads.append(self._addThread(target=_Acquire,
2066
                                          args=(lock, shared, prev, ev)))
2067
          prev = ev
2068

    
2069
        # Start acquires
2070
        first.set()
2071

    
2072
        # Wait for last acquire to start waiting
2073
        prev.wait()
2074

    
2075
        # NOTE: This works only because QueryLocks will acquire the
2076
        # lock-internal lock again and won't be able to get the information
2077
        # until it has the lock. By then the acquire should be registered in
2078
        # SharedLock.__pending (otherwise it's a bug).
2079

    
2080
        # All acquires are waiting now
2081
        if shared:
2082
          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2083
        else:
2084
          pending = [("exclusive", [t.getName()]) for t in threads]
2085

    
2086
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2087
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2088
                         [[(constants.RS_NORMAL, lock.name),
2089
                           (constants.RS_NORMAL, "exclusive"),
2090
                           (constants.RS_NORMAL,
2091
                            [threading.currentThread().getName()]),
2092
                           (constants.RS_NORMAL, pending)]])
2093

    
2094
        self.assertEqual(len(self.lm._locks), 1)
2095
      finally:
2096
        lock.release()
2097

    
2098
      self._waitThreads()
2099

    
2100
      # No pending acquires
2101
      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2102
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2103
                       [[(constants.RS_NORMAL, lock.name),
2104
                         (constants.RS_NORMAL, None),
2105
                         (constants.RS_NORMAL, None),
2106
                         (constants.RS_NORMAL, [])]])
2107

    
2108
      self.assertEqual(len(self.lm._locks), 1)
2109

    
2110
  def testDeleteAndRecreate(self):
2111
    lname = "TestLock101923193"
2112

    
2113
    # Create some locks with the same name and keep all references
2114
    locks = [locking.SharedLock(lname, monitor=self.lm)
2115
             for _ in range(5)]
2116

    
2117
    self.assertEqual(len(self.lm._locks), len(locks))
2118

    
2119
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2120
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2121
                     [[(constants.RS_NORMAL, lname),
2122
                       (constants.RS_NORMAL, None),
2123
                       (constants.RS_NORMAL, None)]] * 5)
2124

    
2125
    locks[2].delete()
2126

    
2127
    # Check information order
2128
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2129
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2130
                     [[(constants.RS_NORMAL, lname),
2131
                       (constants.RS_NORMAL, None),
2132
                       (constants.RS_NORMAL, None)]] * 2 +
2133
                     [[(constants.RS_NORMAL, lname),
2134
                       (constants.RS_NORMAL, "deleted"),
2135
                       (constants.RS_NORMAL, None)]] +
2136
                     [[(constants.RS_NORMAL, lname),
2137
                       (constants.RS_NORMAL, None),
2138
                       (constants.RS_NORMAL, None)]] * 2)
2139

    
2140
    locks[1].acquire(shared=0)
2141

    
2142
    last_status = [
2143
      [(constants.RS_NORMAL, lname),
2144
       (constants.RS_NORMAL, None),
2145
       (constants.RS_NORMAL, None)],
2146
      [(constants.RS_NORMAL, lname),
2147
       (constants.RS_NORMAL, "exclusive"),
2148
       (constants.RS_NORMAL, [threading.currentThread().getName()])],
2149
      [(constants.RS_NORMAL, lname),
2150
       (constants.RS_NORMAL, "deleted"),
2151
       (constants.RS_NORMAL, None)],
2152
      [(constants.RS_NORMAL, lname),
2153
       (constants.RS_NORMAL, None),
2154
       (constants.RS_NORMAL, None)],
2155
      [(constants.RS_NORMAL, lname),
2156
       (constants.RS_NORMAL, None),
2157
       (constants.RS_NORMAL, None)],
2158
      ]
2159

    
2160
    # Check information order
2161
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2162
    self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2163

    
2164
    self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2165
    self.assertEqual(len(self.lm._locks), len(locks))
2166

    
2167
    # Check lock deletion
2168
    for idx in range(len(locks)):
2169
      del locks[0]
2170
      assert gc.isenabled()
2171
      gc.collect()
2172
      self.assertEqual(len(self.lm._locks), len(locks))
2173
      result = self.lm.QueryLocks(["name", "mode", "owner"])
2174
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2175
                       last_status[idx + 1:])
2176

    
2177
    # All locks should have been deleted
2178
    assert not locks
2179
    self.assertFalse(self.lm._locks)
2180

    
2181
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2182
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2183

    
2184
  class _FakeLock:
2185
    def __init__(self):
2186
      self._info = []
2187

    
2188
    def AddResult(self, *args):
2189
      self._info.append(args)
2190

    
2191
    def CountPending(self):
2192
      return len(self._info)
2193

    
2194
    def GetLockInfo(self, requested):
2195
      (exp_requested, result) = self._info.pop(0)
2196

    
2197
      if exp_requested != requested:
2198
        raise Exception("Requested information (%s) does not match"
2199
                        " expectations (%s)" % (requested, exp_requested))
2200

    
2201
      return result
2202

    
2203
  def testMultipleResults(self):
2204
    fl1 = self._FakeLock()
2205
    fl2 = self._FakeLock()
2206

    
2207
    self.lm.RegisterLock(fl1)
2208
    self.lm.RegisterLock(fl2)
2209

    
2210
    # Empty information
2211
    for i in [fl1, fl2]:
2212
      i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2213
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2214
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2215
    for i in [fl1, fl2]:
2216
      self.assertEqual(i.CountPending(), 0)
2217

    
2218
    # Check ordering
2219
    for fn in [lambda x: x, reversed, sorted]:
2220
      fl1.AddResult(set(), list(fn([
2221
        ("aaa", None, None, None),
2222
        ("bbb", None, None, None),
2223
        ])))
2224
      fl2.AddResult(set(), [])
2225
      result = self.lm.QueryLocks(["name"])
2226
      self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2227
        [(constants.RS_NORMAL, "aaa")],
2228
        [(constants.RS_NORMAL, "bbb")],
2229
        ])
2230
      for i in [fl1, fl2]:
2231
        self.assertEqual(i.CountPending(), 0)
2232

    
2233
      for fn2 in [lambda x: x, reversed, sorted]:
2234
        fl1.AddResult(set([query.LQ_MODE]), list(fn([
2235
          # Same name, but different information
2236
          ("aaa", "mode0", None, None),
2237
          ("aaa", "mode1", None, None),
2238
          ("aaa", "mode2", None, None),
2239
          ("aaa", "mode3", None, None),
2240
          ])))
2241
        fl2.AddResult(set([query.LQ_MODE]), [
2242
          ("zzz", "end", None, None),
2243
          ("000", "start", None, None),
2244
          ] + list(fn2([
2245
          ("aaa", "b200", None, None),
2246
          ("aaa", "b300", None, None),
2247
          ])))
2248
        result = self.lm.QueryLocks(["name", "mode"])
2249
        self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2250
          [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2251
          ] + list(fn([
2252
          # Name is the same, so order must be equal to incoming order
2253
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2254
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2255
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2256
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2257
          ])) + list(fn2([
2258
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2259
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2260
          ])) + [
2261
          [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2262
          ])
2263
        for i in [fl1, fl2]:
2264
          self.assertEqual(i.CountPending(), 0)
2265

    
2266

    
2267
if __name__ == '__main__':
2268
  testutils.GanetiTestProgram()