Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 83f2d5f6

History | View | Annotate | Download (66.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import gc
32
import itertools
33

    
34
from ganeti import constants
35
from ganeti import locking
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import compat
39
from ganeti import objects
40
from ganeti import query
41

    
42
import testutils
43

    
44

    
45
# This is used to test the ssynchronize decorator.
46
# Since it's passed as input to a decorator it must be declared as a global.
47
_decoratorlock = locking.SharedLock("decorator lock")
48

    
49
#: List for looping tests
50
ITERATIONS = range(8)
51

    
52

    
53
def _Repeat(fn):
54
  """Decorator for executing a function many times"""
55
  def wrapper(*args, **kwargs):
56
    for i in ITERATIONS:
57
      fn(*args, **kwargs)
58
  return wrapper
59

    
60

    
61
def SafeSleep(duration):
62
  start = time.time()
63
  while True:
64
    delay = start + duration - time.time()
65
    if delay <= 0.0:
66
      break
67
    time.sleep(delay)
68

    
69

    
70
class _ThreadedTestCase(unittest.TestCase):
71
  """Test class that supports adding/waiting on threads"""
72
  def setUp(self):
73
    unittest.TestCase.setUp(self)
74
    self.done = Queue.Queue(0)
75
    self.threads = []
76

    
77
  def _addThread(self, *args, **kwargs):
78
    """Create and remember a new thread"""
79
    t = threading.Thread(*args, **kwargs)
80
    self.threads.append(t)
81
    t.start()
82
    return t
83

    
84
  def _waitThreads(self):
85
    """Wait for all our threads to finish"""
86
    for t in self.threads:
87
      t.join(60)
88
      self.failIf(t.isAlive())
89
    self.threads = []
90

    
91

    
92
class _ConditionTestCase(_ThreadedTestCase):
93
  """Common test case for conditions"""
94

    
95
  def setUp(self, cls):
96
    _ThreadedTestCase.setUp(self)
97
    self.lock = threading.Lock()
98
    self.cond = cls(self.lock)
99

    
100
  def _testAcquireRelease(self):
101
    self.assertFalse(self.cond._is_owned())
102
    self.assertRaises(RuntimeError, self.cond.wait, None)
103
    self.assertRaises(RuntimeError, self.cond.notifyAll)
104

    
105
    self.cond.acquire()
106
    self.assert_(self.cond._is_owned())
107
    self.cond.notifyAll()
108
    self.assert_(self.cond._is_owned())
109
    self.cond.release()
110

    
111
    self.assertFalse(self.cond._is_owned())
112
    self.assertRaises(RuntimeError, self.cond.wait, None)
113
    self.assertRaises(RuntimeError, self.cond.notifyAll)
114

    
115
  def _testNotification(self):
116
    def _NotifyAll():
117
      self.done.put("NE")
118
      self.cond.acquire()
119
      self.done.put("NA")
120
      self.cond.notifyAll()
121
      self.done.put("NN")
122
      self.cond.release()
123

    
124
    self.cond.acquire()
125
    self._addThread(target=_NotifyAll)
126
    self.assertEqual(self.done.get(True, 1), "NE")
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.wait(None)
129
    self.assertEqual(self.done.get(True, 1), "NA")
130
    self.assertEqual(self.done.get(True, 1), "NN")
131
    self.assert_(self.cond._is_owned())
132
    self.cond.release()
133
    self.assertFalse(self.cond._is_owned())
134

    
135

    
136
class TestSingleNotifyPipeCondition(_ConditionTestCase):
137
  """SingleNotifyPipeCondition tests"""
138

    
139
  def setUp(self):
140
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141

    
142
  def testAcquireRelease(self):
143
    self._testAcquireRelease()
144

    
145
  def testNotification(self):
146
    self._testNotification()
147

    
148
  def testWaitReuse(self):
149
    self.cond.acquire()
150
    self.cond.wait(0)
151
    self.cond.wait(0.1)
152
    self.cond.release()
153

    
154
  def testNoNotifyReuse(self):
155
    self.cond.acquire()
156
    self.cond.notifyAll()
157
    self.assertRaises(RuntimeError, self.cond.wait, None)
158
    self.assertRaises(RuntimeError, self.cond.notifyAll)
159
    self.cond.release()
160

    
161

    
162
class TestPipeCondition(_ConditionTestCase):
163
  """PipeCondition tests"""
164

    
165
  def setUp(self):
166
    _ConditionTestCase.setUp(self, locking.PipeCondition)
167

    
168
  def testAcquireRelease(self):
169
    self._testAcquireRelease()
170

    
171
  def testNotification(self):
172
    self._testNotification()
173

    
174
  def _TestWait(self, fn):
175
    threads = [
176
      self._addThread(target=fn),
177
      self._addThread(target=fn),
178
      self._addThread(target=fn),
179
      ]
180

    
181
    # Wait for threads to be waiting
182
    for _ in threads:
183
      self.assertEqual(self.done.get(True, 1), "A")
184

    
185
    self.assertRaises(Queue.Empty, self.done.get_nowait)
186

    
187
    self.cond.acquire()
188
    self.assertEqual(len(self.cond._waiters), 3)
189
    self.assertEqual(self.cond._waiters, set(threads))
190
    # This new thread can't acquire the lock, and thus call wait, before we
191
    # release it
192
    self._addThread(target=fn)
193
    self.cond.notifyAll()
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195
    self.cond.release()
196

    
197
    # We should now get 3 W and 1 A (for the new thread) in whatever order
198
    w = 0
199
    a = 0
200
    for i in range(4):
201
      got = self.done.get(True, 1)
202
      if got == "W":
203
        w += 1
204
      elif got == "A":
205
        a += 1
206
      else:
207
        self.fail("Got %s on the done queue" % got)
208

    
209
    self.assertEqual(w, 3)
210
    self.assertEqual(a, 1)
211

    
212
    self.cond.acquire()
213
    self.cond.notifyAll()
214
    self.cond.release()
215
    self._waitThreads()
216
    self.assertEqual(self.done.get_nowait(), "W")
217
    self.assertRaises(Queue.Empty, self.done.get_nowait)
218

    
219
  def testBlockingWait(self):
220
    def _BlockingWait():
221
      self.cond.acquire()
222
      self.done.put("A")
223
      self.cond.wait(None)
224
      self.cond.release()
225
      self.done.put("W")
226

    
227
    self._TestWait(_BlockingWait)
228

    
229
  def testLongTimeoutWait(self):
230
    def _Helper():
231
      self.cond.acquire()
232
      self.done.put("A")
233
      self.cond.wait(15.0)
234
      self.cond.release()
235
      self.done.put("W")
236

    
237
    self._TestWait(_Helper)
238

    
239
  def _TimeoutWait(self, timeout, check):
240
    self.cond.acquire()
241
    self.cond.wait(timeout)
242
    self.cond.release()
243
    self.done.put(check)
244

    
245
  def testShortTimeoutWait(self):
246
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248
    self._waitThreads()
249
    self.assertEqual(self.done.get_nowait(), "T1")
250
    self.assertEqual(self.done.get_nowait(), "T1")
251
    self.assertRaises(Queue.Empty, self.done.get_nowait)
252

    
253
  def testZeroTimeoutWait(self):
254
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257
    self._waitThreads()
258
    self.assertEqual(self.done.get_nowait(), "T0")
259
    self.assertEqual(self.done.get_nowait(), "T0")
260
    self.assertEqual(self.done.get_nowait(), "T0")
261
    self.assertRaises(Queue.Empty, self.done.get_nowait)
262

    
263

    
264
class TestSharedLock(_ThreadedTestCase):
265
  """SharedLock tests"""
266

    
267
  def setUp(self):
268
    _ThreadedTestCase.setUp(self)
269
    self.sl = locking.SharedLock("TestSharedLock")
270

    
271
  def testSequenceAndOwnership(self):
272
    self.assertFalse(self.sl._is_owned())
273
    self.sl.acquire(shared=1)
274
    self.assert_(self.sl._is_owned())
275
    self.assert_(self.sl._is_owned(shared=1))
276
    self.assertFalse(self.sl._is_owned(shared=0))
277
    self.sl.release()
278
    self.assertFalse(self.sl._is_owned())
279
    self.sl.acquire()
280
    self.assert_(self.sl._is_owned())
281
    self.assertFalse(self.sl._is_owned(shared=1))
282
    self.assert_(self.sl._is_owned(shared=0))
283
    self.sl.release()
284
    self.assertFalse(self.sl._is_owned())
285
    self.sl.acquire(shared=1)
286
    self.assert_(self.sl._is_owned())
287
    self.assert_(self.sl._is_owned(shared=1))
288
    self.assertFalse(self.sl._is_owned(shared=0))
289
    self.sl.release()
290
    self.assertFalse(self.sl._is_owned())
291

    
292
  def testBooleanValue(self):
293
    # semaphores are supposed to return a true value on a successful acquire
294
    self.assert_(self.sl.acquire(shared=1))
295
    self.sl.release()
296
    self.assert_(self.sl.acquire())
297
    self.sl.release()
298

    
299
  def testDoubleLockingStoE(self):
300
    self.sl.acquire(shared=1)
301
    self.assertRaises(AssertionError, self.sl.acquire)
302

    
303
  def testDoubleLockingEtoS(self):
304
    self.sl.acquire()
305
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306

    
307
  def testDoubleLockingStoS(self):
308
    self.sl.acquire(shared=1)
309
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310

    
311
  def testDoubleLockingEtoE(self):
312
    self.sl.acquire()
313
    self.assertRaises(AssertionError, self.sl.acquire)
314

    
315
  # helper functions: called in a separate thread they acquire the lock, send
316
  # their identifier on the done queue, then release it.
317
  def _doItSharer(self):
318
    try:
319
      self.sl.acquire(shared=1)
320
      self.done.put('SHR')
321
      self.sl.release()
322
    except errors.LockError:
323
      self.done.put('ERR')
324

    
325
  def _doItExclusive(self):
326
    try:
327
      self.sl.acquire()
328
      self.done.put('EXC')
329
      self.sl.release()
330
    except errors.LockError:
331
      self.done.put('ERR')
332

    
333
  def _doItDelete(self):
334
    try:
335
      self.sl.delete()
336
      self.done.put('DEL')
337
    except errors.LockError:
338
      self.done.put('ERR')
339

    
340
  def testSharersCanCoexist(self):
341
    self.sl.acquire(shared=1)
342
    threading.Thread(target=self._doItSharer).start()
343
    self.assert_(self.done.get(True, 1))
344
    self.sl.release()
345

    
346
  @_Repeat
347
  def testExclusiveBlocksExclusive(self):
348
    self.sl.acquire()
349
    self._addThread(target=self._doItExclusive)
350
    self.assertRaises(Queue.Empty, self.done.get_nowait)
351
    self.sl.release()
352
    self._waitThreads()
353
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354

    
355
  @_Repeat
356
  def testExclusiveBlocksDelete(self):
357
    self.sl.acquire()
358
    self._addThread(target=self._doItDelete)
359
    self.assertRaises(Queue.Empty, self.done.get_nowait)
360
    self.sl.release()
361
    self._waitThreads()
362
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363
    self.sl = locking.SharedLock(self.sl.name)
364

    
365
  @_Repeat
366
  def testExclusiveBlocksSharer(self):
367
    self.sl.acquire()
368
    self._addThread(target=self._doItSharer)
369
    self.assertRaises(Queue.Empty, self.done.get_nowait)
370
    self.sl.release()
371
    self._waitThreads()
372
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373

    
374
  @_Repeat
375
  def testSharerBlocksExclusive(self):
376
    self.sl.acquire(shared=1)
377
    self._addThread(target=self._doItExclusive)
378
    self.assertRaises(Queue.Empty, self.done.get_nowait)
379
    self.sl.release()
380
    self._waitThreads()
381
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382

    
383
  @_Repeat
384
  def testSharerBlocksDelete(self):
385
    self.sl.acquire(shared=1)
386
    self._addThread(target=self._doItDelete)
387
    self.assertRaises(Queue.Empty, self.done.get_nowait)
388
    self.sl.release()
389
    self._waitThreads()
390
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391
    self.sl = locking.SharedLock(self.sl.name)
392

    
393
  @_Repeat
394
  def testWaitingExclusiveBlocksSharer(self):
395
    """SKIPPED testWaitingExclusiveBlockSharer"""
396
    return
397

    
398
    self.sl.acquire(shared=1)
399
    # the lock is acquired in shared mode...
400
    self._addThread(target=self._doItExclusive)
401
    # ...but now an exclusive is waiting...
402
    self._addThread(target=self._doItSharer)
403
    # ...so the sharer should be blocked as well
404
    self.assertRaises(Queue.Empty, self.done.get_nowait)
405
    self.sl.release()
406
    self._waitThreads()
407
    # The exclusive passed before
408
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410

    
411
  @_Repeat
412
  def testWaitingSharerBlocksExclusive(self):
413
    """SKIPPED testWaitingSharerBlocksExclusive"""
414
    return
415

    
416
    self.sl.acquire()
417
    # the lock is acquired in exclusive mode...
418
    self._addThread(target=self._doItSharer)
419
    # ...but now a sharer is waiting...
420
    self._addThread(target=self._doItExclusive)
421
    # ...the exclusive is waiting too...
422
    self.assertRaises(Queue.Empty, self.done.get_nowait)
423
    self.sl.release()
424
    self._waitThreads()
425
    # The sharer passed before
426
    self.assertEqual(self.done.get_nowait(), 'SHR')
427
    self.assertEqual(self.done.get_nowait(), 'EXC')
428

    
429
  def testDelete(self):
430
    self.sl.delete()
431
    self.assertRaises(errors.LockError, self.sl.acquire)
432
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433
    self.assertRaises(errors.LockError, self.sl.delete)
434

    
435
  def testDeleteTimeout(self):
436
    self.sl.delete(timeout=60)
437

    
438
  def testNoDeleteIfSharer(self):
439
    self.sl.acquire(shared=1)
440
    self.assertRaises(AssertionError, self.sl.delete)
441

    
442
  @_Repeat
443
  def testDeletePendingSharersExclusiveDelete(self):
444
    self.sl.acquire()
445
    self._addThread(target=self._doItSharer)
446
    self._addThread(target=self._doItSharer)
447
    self._addThread(target=self._doItExclusive)
448
    self._addThread(target=self._doItDelete)
449
    self.sl.delete()
450
    self._waitThreads()
451
    # The threads who were pending return ERR
452
    for _ in range(4):
453
      self.assertEqual(self.done.get_nowait(), 'ERR')
454
    self.sl = locking.SharedLock(self.sl.name)
455

    
456
  @_Repeat
457
  def testDeletePendingDeleteExclusiveSharers(self):
458
    self.sl.acquire()
459
    self._addThread(target=self._doItDelete)
460
    self._addThread(target=self._doItExclusive)
461
    self._addThread(target=self._doItSharer)
462
    self._addThread(target=self._doItSharer)
463
    self.sl.delete()
464
    self._waitThreads()
465
    # The two threads who were pending return both ERR
466
    self.assertEqual(self.done.get_nowait(), 'ERR')
467
    self.assertEqual(self.done.get_nowait(), 'ERR')
468
    self.assertEqual(self.done.get_nowait(), 'ERR')
469
    self.assertEqual(self.done.get_nowait(), 'ERR')
470
    self.sl = locking.SharedLock(self.sl.name)
471

    
472
  @_Repeat
473
  def testExclusiveAcquireTimeout(self):
474
    for shared in [0, 1]:
475
      on_queue = threading.Event()
476
      release_exclusive = threading.Event()
477

    
478
      def _LockExclusive():
479
        self.sl.acquire(shared=0, test_notify=on_queue.set)
480
        self.done.put("A: start wait")
481
        release_exclusive.wait()
482
        self.done.put("A: end wait")
483
        self.sl.release()
484

    
485
      # Start thread to hold lock in exclusive mode
486
      self._addThread(target=_LockExclusive)
487

    
488
      # Wait for wait to begin
489
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
490

    
491
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
492
      # on the queue
493
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494
                                      test_notify=release_exclusive.set))
495

    
496
      self.done.put("got 2nd")
497
      self.sl.release()
498

    
499
      self._waitThreads()
500

    
501
      self.assertEqual(self.done.get_nowait(), "A: end wait")
502
      self.assertEqual(self.done.get_nowait(), "got 2nd")
503
      self.assertRaises(Queue.Empty, self.done.get_nowait)
504

    
505
  @_Repeat
506
  def testAcquireExpiringTimeout(self):
507
    def _AcquireWithTimeout(shared, timeout):
508
      if not self.sl.acquire(shared=shared, timeout=timeout):
509
        self.done.put("timeout")
510

    
511
    for shared in [0, 1]:
512
      # Lock exclusively
513
      self.sl.acquire()
514

    
515
      # Start shared acquires with timeout between 0 and 20 ms
516
      for i in range(11):
517
        self._addThread(target=_AcquireWithTimeout,
518
                        args=(shared, i * 2.0 / 1000.0))
519

    
520
      # Wait for threads to finish (makes sure the acquire timeout expires
521
      # before releasing the lock)
522
      self._waitThreads()
523

    
524
      # Release lock
525
      self.sl.release()
526

    
527
      for _ in range(11):
528
        self.assertEqual(self.done.get_nowait(), "timeout")
529

    
530
      self.assertRaises(Queue.Empty, self.done.get_nowait)
531

    
532
  @_Repeat
533
  def testSharedSkipExclusiveAcquires(self):
534
    # Tests whether shared acquires jump in front of exclusive acquires in the
535
    # queue.
536

    
537
    def _Acquire(shared, name, notify_ev, wait_ev):
538
      if notify_ev:
539
        notify_fn = notify_ev.set
540
      else:
541
        notify_fn = None
542

    
543
      if wait_ev:
544
        wait_ev.wait()
545

    
546
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
547
        return
548

    
549
      self.done.put(name)
550
      self.sl.release()
551

    
552
    # Get exclusive lock while we fill the queue
553
    self.sl.acquire()
554

    
555
    shrcnt1 = 5
556
    shrcnt2 = 7
557
    shrcnt3 = 9
558
    shrcnt4 = 2
559

    
560
    # Add acquires using threading.Event for synchronization. They'll be
561
    # acquired exactly in the order defined in this list.
562
    acquires = (shrcnt1 * [(1, "shared 1")] +
563
                3 * [(0, "exclusive 1")] +
564
                shrcnt2 * [(1, "shared 2")] +
565
                shrcnt3 * [(1, "shared 3")] +
566
                shrcnt4 * [(1, "shared 4")] +
567
                3 * [(0, "exclusive 2")])
568

    
569
    ev_cur = None
570
    ev_prev = None
571

    
572
    for args in acquires:
573
      ev_cur = threading.Event()
574
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
575
      ev_prev = ev_cur
576

    
577
    # Wait for last acquire to start
578
    ev_prev.wait()
579

    
580
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
581
    # together
582
    self.assertEqual(self.sl._count_pending(), 7)
583

    
584
    # Release exclusive lock and wait
585
    self.sl.release()
586

    
587
    self._waitThreads()
588

    
589
    # Check sequence
590
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591
      # Shared locks aren't guaranteed to be notified in order, but they'll be
592
      # first
593
      tmp = self.done.get_nowait()
594
      if tmp == "shared 1":
595
        shrcnt1 -= 1
596
      elif tmp == "shared 2":
597
        shrcnt2 -= 1
598
      elif tmp == "shared 3":
599
        shrcnt3 -= 1
600
      elif tmp == "shared 4":
601
        shrcnt4 -= 1
602
    self.assertEqual(shrcnt1, 0)
603
    self.assertEqual(shrcnt2, 0)
604
    self.assertEqual(shrcnt3, 0)
605
    self.assertEqual(shrcnt3, 0)
606

    
607
    for _ in range(3):
608
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
609

    
610
    for _ in range(3):
611
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
612

    
613
    self.assertRaises(Queue.Empty, self.done.get_nowait)
614

    
615
  @_Repeat
616
  def testMixedAcquireTimeout(self):
617
    sync = threading.Event()
618

    
619
    def _AcquireShared(ev):
620
      if not self.sl.acquire(shared=1, timeout=None):
621
        return
622

    
623
      self.done.put("shared")
624

    
625
      # Notify main thread
626
      ev.set()
627

    
628
      # Wait for notification from main thread
629
      sync.wait()
630

    
631
      # Release lock
632
      self.sl.release()
633

    
634
    acquires = []
635
    for _ in range(3):
636
      ev = threading.Event()
637
      self._addThread(target=_AcquireShared, args=(ev, ))
638
      acquires.append(ev)
639

    
640
    # Wait for all acquires to finish
641
    for i in acquires:
642
      i.wait()
643

    
644
    self.assertEqual(self.sl._count_pending(), 0)
645

    
646
    # Try to get exclusive lock
647
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
648

    
649
    # Acquire exclusive without timeout
650
    exclsync = threading.Event()
651
    exclev = threading.Event()
652

    
653
    def _AcquireExclusive():
654
      if not self.sl.acquire(shared=0):
655
        return
656

    
657
      self.done.put("exclusive")
658

    
659
      # Notify main thread
660
      exclev.set()
661

    
662
      # Wait for notification from main thread
663
      exclsync.wait()
664

    
665
      self.sl.release()
666

    
667
    self._addThread(target=_AcquireExclusive)
668

    
669
    # Try to get exclusive lock
670
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
671

    
672
    # Make all shared holders release their locks
673
    sync.set()
674

    
675
    # Wait for exclusive acquire to succeed
676
    exclev.wait()
677

    
678
    self.assertEqual(self.sl._count_pending(), 0)
679

    
680
    # Try to get exclusive lock
681
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
682

    
683
    def _AcquireSharedSimple():
684
      if self.sl.acquire(shared=1, timeout=None):
685
        self.done.put("shared2")
686
        self.sl.release()
687

    
688
    for _ in range(10):
689
      self._addThread(target=_AcquireSharedSimple)
690

    
691
    # Tell exclusive lock to release
692
    exclsync.set()
693

    
694
    # Wait for everything to finish
695
    self._waitThreads()
696

    
697
    self.assertEqual(self.sl._count_pending(), 0)
698

    
699
    # Check sequence
700
    for _ in range(3):
701
      self.assertEqual(self.done.get_nowait(), "shared")
702

    
703
    self.assertEqual(self.done.get_nowait(), "exclusive")
704

    
705
    for _ in range(10):
706
      self.assertEqual(self.done.get_nowait(), "shared2")
707

    
708
    self.assertRaises(Queue.Empty, self.done.get_nowait)
709

    
710
  def testPriority(self):
711
    # Acquire in exclusive mode
712
    self.assert_(self.sl.acquire(shared=0))
713

    
714
    # Queue acquires
715
    def _Acquire(prev, next, shared, priority, result):
716
      prev.wait()
717
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
718
      try:
719
        self.done.put(result)
720
      finally:
721
        self.sl.release()
722

    
723
    counter = itertools.count(0)
724
    priorities = range(-20, 30)
725
    first = threading.Event()
726
    prev = first
727

    
728
    # Data structure:
729
    # {
730
    #   priority:
731
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
732
    #      (shared/exclusive, ...),
733
    #      ...,
734
    #     ],
735
    # }
736
    perprio = {}
737

    
738
    # References shared acquire per priority in L{perprio}. Data structure:
739
    # {
740
    #   priority: (shared=1, set(acquire names), set(pending threads)),
741
    # }
742
    prioshared = {}
743

    
744
    for seed in [4979, 9523, 14902, 32440]:
745
      # Use a deterministic random generator
746
      rnd = random.Random(seed)
747
      for priority in [rnd.choice(priorities) for _ in range(30)]:
748
        modes = [0, 1]
749
        rnd.shuffle(modes)
750
        for shared in modes:
751
          # Unique name
752
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
753

    
754
          ev = threading.Event()
755
          thread = self._addThread(target=_Acquire,
756
                                   args=(prev, ev, shared, priority, acqname))
757
          prev = ev
758

    
759
          # Record expected aqcuire, see above for structure
760
          data = (shared, set([acqname]), set([thread]))
761
          priolist = perprio.setdefault(priority, [])
762
          if shared:
763
            priosh = prioshared.get(priority, None)
764
            if priosh:
765
              # Shared acquires are merged
766
              for i, j in zip(priosh[1:], data[1:]):
767
                i.update(j)
768
              assert data[0] == priosh[0]
769
            else:
770
              prioshared[priority] = data
771
              priolist.append(data)
772
          else:
773
            priolist.append(data)
774

    
775
    # Start all acquires and wait for them
776
    first.set()
777
    prev.wait()
778

    
779
    # Check lock information
780
    self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
781
    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
782
                     (self.sl.name, "exclusive",
783
                      [threading.currentThread().getName()], None))
784

    
785
    self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
786

    
787
    # Let threads acquire the lock
788
    self.sl.release()
789

    
790
    # Wait for everything to finish
791
    self._waitThreads()
792

    
793
    self.assert_(self.sl._check_empty())
794

    
795
    # Check acquires by priority
796
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
797
      for (_, names, _) in acquires:
798
        # For shared acquires, the set will contain 1..n entries. For exclusive
799
        # acquires only one.
800
        while names:
801
          names.remove(self.done.get_nowait())
802
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
803

    
804
    self.assertRaises(Queue.Empty, self.done.get_nowait)
805

    
806
  def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
807
    self.assertEqual(name, self.sl.name)
808
    self.assert_(mode is None)
809
    self.assert_(owner is None)
810

    
811
    self.assertEqual([(pendmode, sorted(waiting))
812
                      for (pendmode, waiting) in pending],
813
                     [(["exclusive", "shared"][int(bool(shared))],
814
                       sorted(t.getName() for t in threads))
815
                      for acquires in [perprio[i]
816
                                       for i in sorted(perprio.keys())]
817
                      for (shared, _, threads) in acquires])
818

    
819

    
820
class TestSharedLockInCondition(_ThreadedTestCase):
821
  """SharedLock as a condition lock tests"""
822

    
823
  def setUp(self):
824
    _ThreadedTestCase.setUp(self)
825
    self.sl = locking.SharedLock("TestSharedLockInCondition")
826
    self.setCondition()
827

    
828
  def setCondition(self):
829
    self.cond = threading.Condition(self.sl)
830

    
831
  def testKeepMode(self):
832
    self.cond.acquire(shared=1)
833
    self.assert_(self.sl._is_owned(shared=1))
834
    self.cond.wait(0)
835
    self.assert_(self.sl._is_owned(shared=1))
836
    self.cond.release()
837
    self.cond.acquire(shared=0)
838
    self.assert_(self.sl._is_owned(shared=0))
839
    self.cond.wait(0)
840
    self.assert_(self.sl._is_owned(shared=0))
841
    self.cond.release()
842

    
843

    
844
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
845
  """SharedLock as a pipe condition lock tests"""
846

    
847
  def setCondition(self):
848
    self.cond = locking.PipeCondition(self.sl)
849

    
850

    
851
class TestSSynchronizedDecorator(_ThreadedTestCase):
852
  """Shared Lock Synchronized decorator test"""
853

    
854
  def setUp(self):
855
    _ThreadedTestCase.setUp(self)
856

    
857
  @locking.ssynchronized(_decoratorlock)
858
  def _doItExclusive(self):
859
    self.assert_(_decoratorlock._is_owned())
860
    self.done.put('EXC')
861

    
862
  @locking.ssynchronized(_decoratorlock, shared=1)
863
  def _doItSharer(self):
864
    self.assert_(_decoratorlock._is_owned(shared=1))
865
    self.done.put('SHR')
866

    
867
  def testDecoratedFunctions(self):
868
    self._doItExclusive()
869
    self.assertFalse(_decoratorlock._is_owned())
870
    self._doItSharer()
871
    self.assertFalse(_decoratorlock._is_owned())
872

    
873
  def testSharersCanCoexist(self):
874
    _decoratorlock.acquire(shared=1)
875
    threading.Thread(target=self._doItSharer).start()
876
    self.assert_(self.done.get(True, 1))
877
    _decoratorlock.release()
878

    
879
  @_Repeat
880
  def testExclusiveBlocksExclusive(self):
881
    _decoratorlock.acquire()
882
    self._addThread(target=self._doItExclusive)
883
    # give it a bit of time to check that it's not actually doing anything
884
    self.assertRaises(Queue.Empty, self.done.get_nowait)
885
    _decoratorlock.release()
886
    self._waitThreads()
887
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
888

    
889
  @_Repeat
890
  def testExclusiveBlocksSharer(self):
891
    _decoratorlock.acquire()
892
    self._addThread(target=self._doItSharer)
893
    self.assertRaises(Queue.Empty, self.done.get_nowait)
894
    _decoratorlock.release()
895
    self._waitThreads()
896
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
897

    
898
  @_Repeat
899
  def testSharerBlocksExclusive(self):
900
    _decoratorlock.acquire(shared=1)
901
    self._addThread(target=self._doItExclusive)
902
    self.assertRaises(Queue.Empty, self.done.get_nowait)
903
    _decoratorlock.release()
904
    self._waitThreads()
905
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
906

    
907

    
908
class TestLockSet(_ThreadedTestCase):
909
  """LockSet tests"""
910

    
911
  def setUp(self):
912
    _ThreadedTestCase.setUp(self)
913
    self._setUpLS()
914

    
915
  def _setUpLS(self):
916
    """Helper to (re)initialize the lock set"""
917
    self.resources = ['one', 'two', 'three']
918
    self.ls = locking.LockSet(self.resources, "TestLockSet")
919

    
920
  def testResources(self):
921
    self.assertEquals(self.ls._names(), set(self.resources))
922
    newls = locking.LockSet([], "TestLockSet.testResources")
923
    self.assertEquals(newls._names(), set())
924

    
925
  def testAcquireRelease(self):
926
    self.assert_(self.ls.acquire('one'))
927
    self.assertEquals(self.ls._list_owned(), set(['one']))
928
    self.ls.release()
929
    self.assertEquals(self.ls._list_owned(), set())
930
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
931
    self.assertEquals(self.ls._list_owned(), set(['one']))
932
    self.ls.release()
933
    self.assertEquals(self.ls._list_owned(), set())
934
    self.ls.acquire(['one', 'two', 'three'])
935
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
936
    self.ls.release('one')
937
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
938
    self.ls.release(['three'])
939
    self.assertEquals(self.ls._list_owned(), set(['two']))
940
    self.ls.release()
941
    self.assertEquals(self.ls._list_owned(), set())
942
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
943
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
944
    self.ls.release()
945
    self.assertEquals(self.ls._list_owned(), set())
946

    
947
  def testNoDoubleAcquire(self):
948
    self.ls.acquire('one')
949
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
950
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
951
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
952
    self.ls.release()
953
    self.ls.acquire(['one', 'three'])
954
    self.ls.release('one')
955
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
956
    self.ls.release('three')
957

    
958
  def testNoWrongRelease(self):
959
    self.assertRaises(AssertionError, self.ls.release)
960
    self.ls.acquire('one')
961
    self.assertRaises(AssertionError, self.ls.release, 'two')
962

    
963
  def testAddRemove(self):
964
    self.ls.add('four')
965
    self.assertEquals(self.ls._list_owned(), set())
966
    self.assert_('four' in self.ls._names())
967
    self.ls.add(['five', 'six', 'seven'], acquired=1)
968
    self.assert_('five' in self.ls._names())
969
    self.assert_('six' in self.ls._names())
970
    self.assert_('seven' in self.ls._names())
971
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
972
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
973
    self.assert_('five' not in self.ls._names())
974
    self.assert_('six' not in self.ls._names())
975
    self.assertEquals(self.ls._list_owned(), set(['seven']))
976
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
977
    self.ls.remove('seven')
978
    self.assert_('seven' not in self.ls._names())
979
    self.assertEquals(self.ls._list_owned(), set([]))
980
    self.ls.acquire(None, shared=1)
981
    self.assertRaises(AssertionError, self.ls.add, 'eight')
982
    self.ls.release()
983
    self.ls.acquire(None)
984
    self.ls.add('eight', acquired=1)
985
    self.assert_('eight' in self.ls._names())
986
    self.assert_('eight' in self.ls._list_owned())
987
    self.ls.add('nine')
988
    self.assert_('nine' in self.ls._names())
989
    self.assert_('nine' not in self.ls._list_owned())
990
    self.ls.release()
991
    self.ls.remove(['two'])
992
    self.assert_('two' not in self.ls._names())
993
    self.ls.acquire('three')
994
    self.assertEquals(self.ls.remove(['three']), ['three'])
995
    self.assert_('three' not in self.ls._names())
996
    self.assertEquals(self.ls.remove('three'), [])
997
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
998
    self.assert_('one' not in self.ls._names())
999

    
1000
  def testRemoveNonBlocking(self):
1001
    self.ls.acquire('one')
1002
    self.assertEquals(self.ls.remove('one'), ['one'])
1003
    self.ls.acquire(['two', 'three'])
1004
    self.assertEquals(self.ls.remove(['two', 'three']),
1005
                      ['two', 'three'])
1006

    
1007
  def testNoDoubleAdd(self):
1008
    self.assertRaises(errors.LockError, self.ls.add, 'two')
1009
    self.ls.add('four')
1010
    self.assertRaises(errors.LockError, self.ls.add, 'four')
1011

    
1012
  def testNoWrongRemoves(self):
1013
    self.ls.acquire(['one', 'three'], shared=1)
1014
    # Cannot remove 'two' while holding something which is not a superset
1015
    self.assertRaises(AssertionError, self.ls.remove, 'two')
1016
    # Cannot remove 'three' as we are sharing it
1017
    self.assertRaises(AssertionError, self.ls.remove, 'three')
1018

    
1019
  def testAcquireSetLock(self):
1020
    # acquire the set-lock exclusively
1021
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1022
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1023
    self.assertEquals(self.ls._is_owned(), True)
1024
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1025
    # I can still add/remove elements...
1026
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1027
    self.assert_(self.ls.add('six'))
1028
    self.ls.release()
1029
    # share the set-lock
1030
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1031
    # adding new elements is not possible
1032
    self.assertRaises(AssertionError, self.ls.add, 'five')
1033
    self.ls.release()
1034

    
1035
  def testAcquireWithRepetitions(self):
1036
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1037
                      set(['two', 'two', 'three']))
1038
    self.ls.release(['two', 'two'])
1039
    self.assertEquals(self.ls._list_owned(), set(['three']))
1040

    
1041
  def testEmptyAcquire(self):
1042
    # Acquire an empty list of locks...
1043
    self.assertEquals(self.ls.acquire([]), set())
1044
    self.assertEquals(self.ls._list_owned(), set())
1045
    # New locks can still be addded
1046
    self.assert_(self.ls.add('six'))
1047
    # "re-acquiring" is not an issue, since we had really acquired nothing
1048
    self.assertEquals(self.ls.acquire([], shared=1), set())
1049
    self.assertEquals(self.ls._list_owned(), set())
1050
    # We haven't really acquired anything, so we cannot release
1051
    self.assertRaises(AssertionError, self.ls.release)
1052

    
1053
  def _doLockSet(self, names, shared):
1054
    try:
1055
      self.ls.acquire(names, shared=shared)
1056
      self.done.put('DONE')
1057
      self.ls.release()
1058
    except errors.LockError:
1059
      self.done.put('ERR')
1060

    
1061
  def _doAddSet(self, names):
1062
    try:
1063
      self.ls.add(names, acquired=1)
1064
      self.done.put('DONE')
1065
      self.ls.release()
1066
    except errors.LockError:
1067
      self.done.put('ERR')
1068

    
1069
  def _doRemoveSet(self, names):
1070
    self.done.put(self.ls.remove(names))
1071

    
1072
  @_Repeat
1073
  def testConcurrentSharedAcquire(self):
1074
    self.ls.acquire(['one', 'two'], shared=1)
1075
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1076
    self._waitThreads()
1077
    self.assertEqual(self.done.get_nowait(), 'DONE')
1078
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1079
    self._waitThreads()
1080
    self.assertEqual(self.done.get_nowait(), 'DONE')
1081
    self._addThread(target=self._doLockSet, args=('three', 1))
1082
    self._waitThreads()
1083
    self.assertEqual(self.done.get_nowait(), 'DONE')
1084
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1085
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1086
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1087
    self.ls.release()
1088
    self._waitThreads()
1089
    self.assertEqual(self.done.get_nowait(), 'DONE')
1090
    self.assertEqual(self.done.get_nowait(), 'DONE')
1091

    
1092
  @_Repeat
1093
  def testConcurrentExclusiveAcquire(self):
1094
    self.ls.acquire(['one', 'two'])
1095
    self._addThread(target=self._doLockSet, args=('three', 1))
1096
    self._waitThreads()
1097
    self.assertEqual(self.done.get_nowait(), 'DONE')
1098
    self._addThread(target=self._doLockSet, args=('three', 0))
1099
    self._waitThreads()
1100
    self.assertEqual(self.done.get_nowait(), 'DONE')
1101
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1102
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1103
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1104
    self._addThread(target=self._doLockSet, args=('one', 0))
1105
    self._addThread(target=self._doLockSet, args=('one', 1))
1106
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1107
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1108
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1109
    self.ls.release()
1110
    self._waitThreads()
1111
    for _ in range(6):
1112
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1113

    
1114
  @_Repeat
1115
  def testSimpleAcquireTimeoutExpiring(self):
1116
    names = sorted(self.ls._names())
1117
    self.assert_(len(names) >= 3)
1118

    
1119
    # Get name of first lock
1120
    first = names[0]
1121

    
1122
    # Get name of last lock
1123
    last = names.pop()
1124

    
1125
    checks = [
1126
      # Block first and try to lock it again
1127
      (first, first),
1128

    
1129
      # Block last and try to lock all locks
1130
      (None, first),
1131

    
1132
      # Block last and try to lock it again
1133
      (last, last),
1134
      ]
1135

    
1136
    for (wanted, block) in checks:
1137
      # Lock in exclusive mode
1138
      self.assert_(self.ls.acquire(block, shared=0))
1139

    
1140
      def _AcquireOne():
1141
        # Try to get the same lock again with a timeout (should never succeed)
1142
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1143
        if acquired:
1144
          self.done.put("acquired")
1145
          self.ls.release()
1146
        else:
1147
          self.assert_(acquired is None)
1148
          self.assertFalse(self.ls._list_owned())
1149
          self.assertFalse(self.ls._is_owned())
1150
          self.done.put("not acquired")
1151

    
1152
      self._addThread(target=_AcquireOne)
1153

    
1154
      # Wait for timeout in thread to expire
1155
      self._waitThreads()
1156

    
1157
      # Release exclusive lock again
1158
      self.ls.release()
1159

    
1160
      self.assertEqual(self.done.get_nowait(), "not acquired")
1161
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1162

    
1163
  @_Repeat
1164
  def testDelayedAndExpiringLockAcquire(self):
1165
    self._setUpLS()
1166
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1167

    
1168
    for expire in (False, True):
1169
      names = sorted(self.ls._names())
1170
      self.assertEqual(len(names), 8)
1171

    
1172
      lock_ev = dict([(i, threading.Event()) for i in names])
1173

    
1174
      # Lock all in exclusive mode
1175
      self.assert_(self.ls.acquire(names, shared=0))
1176

    
1177
      if expire:
1178
        # We'll wait at least 300ms per lock
1179
        lockwait = len(names) * [0.3]
1180

    
1181
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1182
        # this gives us up to 2.4s to fail.
1183
        lockall_timeout = 0.4
1184
      else:
1185
        # This should finish rather quickly
1186
        lockwait = None
1187
        lockall_timeout = len(names) * 5.0
1188

    
1189
      def _LockAll():
1190
        def acquire_notification(name):
1191
          if not expire:
1192
            self.done.put("getting %s" % name)
1193

    
1194
          # Kick next lock
1195
          lock_ev[name].set()
1196

    
1197
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1198
                           test_notify=acquire_notification):
1199
          self.done.put("got all")
1200
          self.ls.release()
1201
        else:
1202
          self.done.put("timeout on all")
1203

    
1204
        # Notify all locks
1205
        for ev in lock_ev.values():
1206
          ev.set()
1207

    
1208
      t = self._addThread(target=_LockAll)
1209

    
1210
      for idx, name in enumerate(names):
1211
        # Wait for actual acquire on this lock to start
1212
        lock_ev[name].wait(10.0)
1213

    
1214
        if expire and t.isAlive():
1215
          # Wait some time after getting the notification to make sure the lock
1216
          # acquire will expire
1217
          SafeSleep(lockwait[idx])
1218

    
1219
        self.ls.release(names=name)
1220

    
1221
      self.assertFalse(self.ls._list_owned())
1222

    
1223
      self._waitThreads()
1224

    
1225
      if expire:
1226
        # Not checking which locks were actually acquired. Doing so would be
1227
        # too timing-dependant.
1228
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1229
      else:
1230
        for i in names:
1231
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1232
        self.assertEqual(self.done.get_nowait(), "got all")
1233
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1234

    
1235
  @_Repeat
1236
  def testConcurrentRemove(self):
1237
    self.ls.add('four')
1238
    self.ls.acquire(['one', 'two', 'four'])
1239
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1240
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1241
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1242
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1243
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1244
    self.ls.remove('one')
1245
    self.ls.release()
1246
    self._waitThreads()
1247
    for i in range(4):
1248
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1249
    self.ls.add(['five', 'six'], acquired=1)
1250
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1251
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1252
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1253
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1254
    self.ls.remove('five')
1255
    self.ls.release()
1256
    self._waitThreads()
1257
    for i in range(4):
1258
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1259
    self.ls.acquire(['three', 'four'])
1260
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1261
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1262
    self.ls.remove('four')
1263
    self._waitThreads()
1264
    self.assertEqual(self.done.get_nowait(), ['six'])
1265
    self._addThread(target=self._doRemoveSet, args=(['two']))
1266
    self._waitThreads()
1267
    self.assertEqual(self.done.get_nowait(), ['two'])
1268
    self.ls.release()
1269
    # reset lockset
1270
    self._setUpLS()
1271

    
1272
  @_Repeat
1273
  def testConcurrentSharedSetLock(self):
1274
    # share the set-lock...
1275
    self.ls.acquire(None, shared=1)
1276
    # ...another thread can share it too
1277
    self._addThread(target=self._doLockSet, args=(None, 1))
1278
    self._waitThreads()
1279
    self.assertEqual(self.done.get_nowait(), 'DONE')
1280
    # ...or just share some elements
1281
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1282
    self._waitThreads()
1283
    self.assertEqual(self.done.get_nowait(), 'DONE')
1284
    # ...but not add new ones or remove any
1285
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1286
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1287
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1288
    # this just releases the set-lock
1289
    self.ls.release([])
1290
    t.join(60)
1291
    self.assertEqual(self.done.get_nowait(), 'DONE')
1292
    # release the lock on the actual elements so remove() can proceed too
1293
    self.ls.release()
1294
    self._waitThreads()
1295
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1296
    # reset lockset
1297
    self._setUpLS()
1298

    
1299
  @_Repeat
1300
  def testConcurrentExclusiveSetLock(self):
1301
    # acquire the set-lock...
1302
    self.ls.acquire(None, shared=0)
1303
    # ...no one can do anything else
1304
    self._addThread(target=self._doLockSet, args=(None, 1))
1305
    self._addThread(target=self._doLockSet, args=(None, 0))
1306
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1307
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1308
    self._addThread(target=self._doAddSet, args=(['nine']))
1309
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1310
    self.ls.release()
1311
    self._waitThreads()
1312
    for _ in range(5):
1313
      self.assertEqual(self.done.get(True, 1), 'DONE')
1314
    # cleanup
1315
    self._setUpLS()
1316

    
1317
  @_Repeat
1318
  def testConcurrentSetLockAdd(self):
1319
    self.ls.acquire('one')
1320
    # Another thread wants the whole SetLock
1321
    self._addThread(target=self._doLockSet, args=(None, 0))
1322
    self._addThread(target=self._doLockSet, args=(None, 1))
1323
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1324
    self.assertRaises(AssertionError, self.ls.add, 'four')
1325
    self.ls.release()
1326
    self._waitThreads()
1327
    self.assertEqual(self.done.get_nowait(), 'DONE')
1328
    self.assertEqual(self.done.get_nowait(), 'DONE')
1329
    self.ls.acquire(None)
1330
    self._addThread(target=self._doLockSet, args=(None, 0))
1331
    self._addThread(target=self._doLockSet, args=(None, 1))
1332
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1333
    self.ls.add('four')
1334
    self.ls.add('five', acquired=1)
1335
    self.ls.add('six', acquired=1, shared=1)
1336
    self.assertEquals(self.ls._list_owned(),
1337
      set(['one', 'two', 'three', 'five', 'six']))
1338
    self.assertEquals(self.ls._is_owned(), True)
1339
    self.assertEquals(self.ls._names(),
1340
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1341
    self.ls.release()
1342
    self._waitThreads()
1343
    self.assertEqual(self.done.get_nowait(), 'DONE')
1344
    self.assertEqual(self.done.get_nowait(), 'DONE')
1345
    self._setUpLS()
1346

    
1347
  @_Repeat
1348
  def testEmptyLockSet(self):
1349
    # get the set-lock
1350
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1351
    # now empty it...
1352
    self.ls.remove(['one', 'two', 'three'])
1353
    # and adds/locks by another thread still wait
1354
    self._addThread(target=self._doAddSet, args=(['nine']))
1355
    self._addThread(target=self._doLockSet, args=(None, 1))
1356
    self._addThread(target=self._doLockSet, args=(None, 0))
1357
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1358
    self.ls.release()
1359
    self._waitThreads()
1360
    for _ in range(3):
1361
      self.assertEqual(self.done.get_nowait(), 'DONE')
1362
    # empty it again...
1363
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1364
    # now share it...
1365
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1366
    # other sharers can go, adds still wait
1367
    self._addThread(target=self._doLockSet, args=(None, 1))
1368
    self._waitThreads()
1369
    self.assertEqual(self.done.get_nowait(), 'DONE')
1370
    self._addThread(target=self._doAddSet, args=(['nine']))
1371
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1372
    self.ls.release()
1373
    self._waitThreads()
1374
    self.assertEqual(self.done.get_nowait(), 'DONE')
1375
    self._setUpLS()
1376

    
1377
  def testPriority(self):
1378
    def _Acquire(prev, next, name, priority, success_fn):
1379
      prev.wait()
1380
      self.assert_(self.ls.acquire(name, shared=0,
1381
                                   priority=priority,
1382
                                   test_notify=lambda _: next.set()))
1383
      try:
1384
        success_fn()
1385
      finally:
1386
        self.ls.release()
1387

    
1388
    # Get all in exclusive mode
1389
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1390

    
1391
    done_two = Queue.Queue(0)
1392

    
1393
    first = threading.Event()
1394
    prev = first
1395

    
1396
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1397
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1398

    
1399
    # Use a deterministic random generator
1400
    random.Random(741).shuffle(acquires)
1401

    
1402
    for (name, prio, done) in acquires:
1403
      ev = threading.Event()
1404
      self._addThread(target=_Acquire,
1405
                      args=(prev, ev, name, prio,
1406
                            compat.partial(done.put, "Prio%s" % prio)))
1407
      prev = ev
1408

    
1409
    # Start acquires
1410
    first.set()
1411

    
1412
    # Wait for last acquire to start
1413
    prev.wait()
1414

    
1415
    # Let threads acquire locks
1416
    self.ls.release()
1417

    
1418
    # Wait for threads to finish
1419
    self._waitThreads()
1420

    
1421
    for i in range(1, 33):
1422
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1423
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1424

    
1425
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1426
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1427

    
1428

    
1429
class TestGanetiLockManager(_ThreadedTestCase):
1430

    
1431
  def setUp(self):
1432
    _ThreadedTestCase.setUp(self)
1433
    self.nodes=['n1', 'n2']
1434
    self.nodegroups=['g1', 'g2']
1435
    self.instances=['i1', 'i2', 'i3']
1436
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1437
                                        self.instances)
1438

    
1439
  def tearDown(self):
1440
    # Don't try this at home...
1441
    locking.GanetiLockManager._instance = None
1442

    
1443
  def testLockingConstants(self):
1444
    # The locking library internally cheats by assuming its constants have some
1445
    # relationships with each other. Check those hold true.
1446
    # This relationship is also used in the Processor to recursively acquire
1447
    # the right locks. Again, please don't break it.
1448
    for i in range(len(locking.LEVELS)):
1449
      self.assertEqual(i, locking.LEVELS[i])
1450

    
1451
  def testDoubleGLFails(self):
1452
    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1453

    
1454
  def testLockNames(self):
1455
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1456
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1457
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1458
                     set(self.nodegroups))
1459
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1460
                     set(self.instances))
1461

    
1462
  def testInitAndResources(self):
1463
    locking.GanetiLockManager._instance = None
1464
    self.GL = locking.GanetiLockManager([], [], [])
1465
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1466
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1467
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1468
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1469

    
1470
    locking.GanetiLockManager._instance = None
1471
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1472
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1473
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1474
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1475
                                    set(self.nodegroups))
1476
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1477

    
1478
    locking.GanetiLockManager._instance = None
1479
    self.GL = locking.GanetiLockManager([], [], self.instances)
1480
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1481
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1482
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1483
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1484
                     set(self.instances))
1485

    
1486
  def testAcquireRelease(self):
1487
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1488
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1489
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1490
    self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1491
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1492
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1493
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1494
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1495
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1496
    self.GL.release(locking.LEVEL_NODE)
1497
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1498
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1499
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1500
    self.GL.release(locking.LEVEL_NODEGROUP)
1501
    self.GL.release(locking.LEVEL_INSTANCE)
1502
    self.assertRaises(errors.LockError, self.GL.acquire,
1503
                      locking.LEVEL_INSTANCE, ['i5'])
1504
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1505
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1506

    
1507
  def testAcquireWholeSets(self):
1508
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1509
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1510
                      set(self.instances))
1511
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1512
                      set(self.instances))
1513
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1514
                      set(self.nodegroups))
1515
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1516
                      set(self.nodegroups))
1517
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1518
                      set(self.nodes))
1519
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1520
                      set(self.nodes))
1521
    self.GL.release(locking.LEVEL_NODE)
1522
    self.GL.release(locking.LEVEL_NODEGROUP)
1523
    self.GL.release(locking.LEVEL_INSTANCE)
1524
    self.GL.release(locking.LEVEL_CLUSTER)
1525

    
1526
  def testAcquireWholeAndPartial(self):
1527
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1528
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1529
                      set(self.instances))
1530
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1531
                      set(self.instances))
1532
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1533
                      set(['n2']))
1534
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1535
                      set(['n2']))
1536
    self.GL.release(locking.LEVEL_NODE)
1537
    self.GL.release(locking.LEVEL_INSTANCE)
1538
    self.GL.release(locking.LEVEL_CLUSTER)
1539

    
1540
  def testBGLDependency(self):
1541
    self.assertRaises(AssertionError, self.GL.acquire,
1542
                      locking.LEVEL_NODE, ['n1', 'n2'])
1543
    self.assertRaises(AssertionError, self.GL.acquire,
1544
                      locking.LEVEL_INSTANCE, ['i3'])
1545
    self.assertRaises(AssertionError, self.GL.acquire,
1546
                      locking.LEVEL_NODEGROUP, ['g1'])
1547
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1548
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1549
    self.assertRaises(AssertionError, self.GL.release,
1550
                      locking.LEVEL_CLUSTER, ['BGL'])
1551
    self.assertRaises(AssertionError, self.GL.release,
1552
                      locking.LEVEL_CLUSTER)
1553
    self.GL.release(locking.LEVEL_NODE)
1554
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1555
    self.assertRaises(AssertionError, self.GL.release,
1556
                      locking.LEVEL_CLUSTER, ['BGL'])
1557
    self.assertRaises(AssertionError, self.GL.release,
1558
                      locking.LEVEL_CLUSTER)
1559
    self.GL.release(locking.LEVEL_INSTANCE)
1560
    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1561
    self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1562
    self.assertRaises(AssertionError, self.GL.release,
1563
                      locking.LEVEL_CLUSTER, ['BGL'])
1564
    self.assertRaises(AssertionError, self.GL.release,
1565
                      locking.LEVEL_CLUSTER)
1566
    self.GL.release(locking.LEVEL_NODEGROUP)
1567
    self.GL.release(locking.LEVEL_CLUSTER)
1568

    
1569
  def testWrongOrder(self):
1570
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1571
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1572
    self.assertRaises(AssertionError, self.GL.acquire,
1573
                      locking.LEVEL_NODE, ['n1'])
1574
    self.assertRaises(AssertionError, self.GL.acquire,
1575
                      locking.LEVEL_NODEGROUP, ['g1'])
1576
    self.assertRaises(AssertionError, self.GL.acquire,
1577
                      locking.LEVEL_INSTANCE, ['i2'])
1578

    
1579
  def testModifiableLevels(self):
1580
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1581
                      ['BGL2'])
1582
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1583
    self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1584
    self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1585
    self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1586
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1587
    self.GL.add(locking.LEVEL_NODE, ['n3'])
1588
    self.GL.remove(locking.LEVEL_NODE, ['n1'])
1589
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1590
    self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1591
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1592
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1593
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1594
    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1595
                      ['BGL2'])
1596

    
1597
  # Helper function to run as a thread that shared the BGL and then acquires
1598
  # some locks at another level.
1599
  def _doLock(self, level, names, shared):
1600
    try:
1601
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1602
      self.GL.acquire(level, names, shared=shared)
1603
      self.done.put('DONE')
1604
      self.GL.release(level)
1605
      self.GL.release(locking.LEVEL_CLUSTER)
1606
    except errors.LockError:
1607
      self.done.put('ERR')
1608

    
1609
  @_Repeat
1610
  def testConcurrency(self):
1611
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1612
    self._addThread(target=self._doLock,
1613
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1614
    self._waitThreads()
1615
    self.assertEqual(self.done.get_nowait(), 'DONE')
1616
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1617
    self._addThread(target=self._doLock,
1618
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1619
    self._waitThreads()
1620
    self.assertEqual(self.done.get_nowait(), 'DONE')
1621
    self._addThread(target=self._doLock,
1622
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1623
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1624
    self.GL.release(locking.LEVEL_INSTANCE)
1625
    self._waitThreads()
1626
    self.assertEqual(self.done.get_nowait(), 'DONE')
1627
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1628
    self._addThread(target=self._doLock,
1629
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1630
    self._waitThreads()
1631
    self.assertEqual(self.done.get_nowait(), 'DONE')
1632
    self._addThread(target=self._doLock,
1633
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1634
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1635
    self.GL.release(locking.LEVEL_INSTANCE)
1636
    self._waitThreads()
1637
    self.assertEqual(self.done.get(True, 1), 'DONE')
1638
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1639

    
1640

    
1641
class TestLockMonitor(_ThreadedTestCase):
1642
  def setUp(self):
1643
    _ThreadedTestCase.setUp(self)
1644
    self.lm = locking.LockMonitor()
1645

    
1646
  def testSingleThread(self):
1647
    locks = []
1648

    
1649
    for i in range(100):
1650
      name = "TestLock%s" % i
1651
      locks.append(locking.SharedLock(name, monitor=self.lm))
1652

    
1653
    self.assertEqual(len(self.lm._locks), len(locks))
1654
    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1655
    self.assertEqual(len(result.fields), 1)
1656
    self.assertEqual(len(result.data), 100)
1657

    
1658
    # Delete all locks
1659
    del locks[:]
1660

    
1661
    # The garbage collector might needs some time
1662
    def _CheckLocks():
1663
      if self.lm._locks:
1664
        raise utils.RetryAgain()
1665

    
1666
    utils.Retry(_CheckLocks, 0.1, 30.0)
1667

    
1668
    self.assertFalse(self.lm._locks)
1669

    
1670
  def testMultiThread(self):
1671
    locks = []
1672

    
1673
    def _CreateLock(prev, next, name):
1674
      prev.wait()
1675
      locks.append(locking.SharedLock(name, monitor=self.lm))
1676
      if next:
1677
        next.set()
1678

    
1679
    expnames = []
1680

    
1681
    first = threading.Event()
1682
    prev = first
1683

    
1684
    # Use a deterministic random generator
1685
    for i in random.Random(4263).sample(range(100), 33):
1686
      name = "MtTestLock%s" % i
1687
      expnames.append(name)
1688

    
1689
      ev = threading.Event()
1690
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1691
      prev = ev
1692

    
1693
    # Add locks
1694
    first.set()
1695
    self._waitThreads()
1696

    
1697
    # Check order in which locks were added
1698
    self.assertEqual([i.name for i in locks], expnames)
1699

    
1700
    # Check query result
1701
    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1702
    self.assert_(isinstance(result, dict))
1703
    response = objects.QueryResponse.FromDict(result)
1704
    self.assertEqual(response.data,
1705
                     [[(constants.RS_NORMAL, name),
1706
                       (constants.RS_NORMAL, None),
1707
                       (constants.RS_NORMAL, None),
1708
                       (constants.RS_NORMAL, [])]
1709
                      for name in utils.NiceSort(expnames)])
1710
    self.assertEqual(len(response.fields), 4)
1711
    self.assertEqual(["name", "mode", "owner", "pending"],
1712
                     [fdef.name for fdef in response.fields])
1713

    
1714
    # Test exclusive acquire
1715
    for tlock in locks[::4]:
1716
      tlock.acquire(shared=0)
1717
      try:
1718
        def _GetExpResult(name):
1719
          if tlock.name == name:
1720
            return [(constants.RS_NORMAL, name),
1721
                    (constants.RS_NORMAL, "exclusive"),
1722
                    (constants.RS_NORMAL,
1723
                     [threading.currentThread().getName()]),
1724
                    (constants.RS_NORMAL, [])]
1725
          return [(constants.RS_NORMAL, name),
1726
                  (constants.RS_NORMAL, None),
1727
                  (constants.RS_NORMAL, None),
1728
                  (constants.RS_NORMAL, [])]
1729

    
1730
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1731
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1732
                         [_GetExpResult(name)
1733
                          for name in utils.NiceSort(expnames)])
1734
      finally:
1735
        tlock.release()
1736

    
1737
    # Test shared acquire
1738
    def _Acquire(lock, shared, ev, notify):
1739
      lock.acquire(shared=shared)
1740
      try:
1741
        notify.set()
1742
        ev.wait()
1743
      finally:
1744
        lock.release()
1745

    
1746
    for tlock1 in locks[::11]:
1747
      for tlock2 in locks[::-15]:
1748
        if tlock2 == tlock1:
1749
          # Avoid deadlocks
1750
          continue
1751

    
1752
        for tlock3 in locks[::10]:
1753
          if tlock3 in (tlock2, tlock1):
1754
            # Avoid deadlocks
1755
            continue
1756

    
1757
          releaseev = threading.Event()
1758

    
1759
          # Acquire locks
1760
          acquireev = []
1761
          tthreads1 = []
1762
          for i in range(3):
1763
            ev = threading.Event()
1764
            tthreads1.append(self._addThread(target=_Acquire,
1765
                                             args=(tlock1, 1, releaseev, ev)))
1766
            acquireev.append(ev)
1767

    
1768
          ev = threading.Event()
1769
          tthread2 = self._addThread(target=_Acquire,
1770
                                     args=(tlock2, 1, releaseev, ev))
1771
          acquireev.append(ev)
1772

    
1773
          ev = threading.Event()
1774
          tthread3 = self._addThread(target=_Acquire,
1775
                                     args=(tlock3, 0, releaseev, ev))
1776
          acquireev.append(ev)
1777

    
1778
          # Wait for all locks to be acquired
1779
          for i in acquireev:
1780
            i.wait()
1781

    
1782
          # Check query result
1783
          result = self.lm.QueryLocks(["name", "mode", "owner"])
1784
          response = objects.QueryResponse.FromDict(result)
1785
          for (name, mode, owner) in response.data:
1786
            (name_status, name_value) = name
1787
            (owner_status, owner_value) = owner
1788

    
1789
            self.assertEqual(name_status, constants.RS_NORMAL)
1790
            self.assertEqual(owner_status, constants.RS_NORMAL)
1791

    
1792
            if name_value == tlock1.name:
1793
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1794
              self.assertEqual(set(owner_value),
1795
                               set(i.getName() for i in tthreads1))
1796
              continue
1797

    
1798
            if name_value == tlock2.name:
1799
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1800
              self.assertEqual(owner_value, [tthread2.getName()])
1801
              continue
1802

    
1803
            if name_value == tlock3.name:
1804
              self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1805
              self.assertEqual(owner_value, [tthread3.getName()])
1806
              continue
1807

    
1808
            self.assert_(name_value in expnames)
1809
            self.assertEqual(mode, (constants.RS_NORMAL, None))
1810
            self.assert_(owner_value is None)
1811

    
1812
          # Release locks again
1813
          releaseev.set()
1814

    
1815
          self._waitThreads()
1816

    
1817
          result = self.lm.QueryLocks(["name", "mode", "owner"])
1818
          self.assertEqual(objects.QueryResponse.FromDict(result).data,
1819
                           [[(constants.RS_NORMAL, name),
1820
                             (constants.RS_NORMAL, None),
1821
                             (constants.RS_NORMAL, None)]
1822
                            for name in utils.NiceSort(expnames)])
1823

    
1824
  def testDelete(self):
1825
    lock = locking.SharedLock("TestLock", monitor=self.lm)
1826

    
1827
    self.assertEqual(len(self.lm._locks), 1)
1828
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1829
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
1830
                     [[(constants.RS_NORMAL, lock.name),
1831
                       (constants.RS_NORMAL, None),
1832
                       (constants.RS_NORMAL, None)]])
1833

    
1834
    lock.delete()
1835

    
1836
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1837
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
1838
                     [[(constants.RS_NORMAL, lock.name),
1839
                       (constants.RS_NORMAL, "deleted"),
1840
                       (constants.RS_NORMAL, None)]])
1841
    self.assertEqual(len(self.lm._locks), 1)
1842

    
1843
  def testPending(self):
1844
    def _Acquire(lock, shared, prev, next):
1845
      prev.wait()
1846

    
1847
      lock.acquire(shared=shared, test_notify=next.set)
1848
      try:
1849
        pass
1850
      finally:
1851
        lock.release()
1852

    
1853
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
1854

    
1855
    for shared in [0, 1]:
1856
      lock.acquire()
1857
      try:
1858
        self.assertEqual(len(self.lm._locks), 1)
1859
        result = self.lm.QueryLocks(["name", "mode", "owner"])
1860
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1861
                         [[(constants.RS_NORMAL, lock.name),
1862
                           (constants.RS_NORMAL, "exclusive"),
1863
                           (constants.RS_NORMAL,
1864
                            [threading.currentThread().getName()])]])
1865

    
1866
        threads = []
1867

    
1868
        first = threading.Event()
1869
        prev = first
1870

    
1871
        for i in range(5):
1872
          ev = threading.Event()
1873
          threads.append(self._addThread(target=_Acquire,
1874
                                          args=(lock, shared, prev, ev)))
1875
          prev = ev
1876

    
1877
        # Start acquires
1878
        first.set()
1879

    
1880
        # Wait for last acquire to start waiting
1881
        prev.wait()
1882

    
1883
        # NOTE: This works only because QueryLocks will acquire the
1884
        # lock-internal lock again and won't be able to get the information
1885
        # until it has the lock. By then the acquire should be registered in
1886
        # SharedLock.__pending (otherwise it's a bug).
1887

    
1888
        # All acquires are waiting now
1889
        if shared:
1890
          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
1891
        else:
1892
          pending = [("exclusive", [t.getName()]) for t in threads]
1893

    
1894
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1895
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1896
                         [[(constants.RS_NORMAL, lock.name),
1897
                           (constants.RS_NORMAL, "exclusive"),
1898
                           (constants.RS_NORMAL,
1899
                            [threading.currentThread().getName()]),
1900
                           (constants.RS_NORMAL, pending)]])
1901

    
1902
        self.assertEqual(len(self.lm._locks), 1)
1903
      finally:
1904
        lock.release()
1905

    
1906
      self._waitThreads()
1907

    
1908
      # No pending acquires
1909
      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1910
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
1911
                       [[(constants.RS_NORMAL, lock.name),
1912
                         (constants.RS_NORMAL, None),
1913
                         (constants.RS_NORMAL, None),
1914
                         (constants.RS_NORMAL, [])]])
1915

    
1916
      self.assertEqual(len(self.lm._locks), 1)
1917

    
1918
  def testDeleteAndRecreate(self):
1919
    lname = "TestLock101923193"
1920

    
1921
    # Create some locks with the same name and keep all references
1922
    locks = [locking.SharedLock(lname, monitor=self.lm)
1923
             for _ in range(5)]
1924

    
1925
    self.assertEqual(len(self.lm._locks), len(locks))
1926

    
1927
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1928
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
1929
                     [[(constants.RS_NORMAL, lname),
1930
                       (constants.RS_NORMAL, None),
1931
                       (constants.RS_NORMAL, None)]] * 5)
1932

    
1933
    locks[2].delete()
1934

    
1935
    # Check information order
1936
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1937
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
1938
                     [[(constants.RS_NORMAL, lname),
1939
                       (constants.RS_NORMAL, None),
1940
                       (constants.RS_NORMAL, None)]] * 2 +
1941
                     [[(constants.RS_NORMAL, lname),
1942
                       (constants.RS_NORMAL, "deleted"),
1943
                       (constants.RS_NORMAL, None)]] +
1944
                     [[(constants.RS_NORMAL, lname),
1945
                       (constants.RS_NORMAL, None),
1946
                       (constants.RS_NORMAL, None)]] * 2)
1947

    
1948
    locks[1].acquire(shared=0)
1949

    
1950
    last_status = [
1951
      [(constants.RS_NORMAL, lname),
1952
       (constants.RS_NORMAL, None),
1953
       (constants.RS_NORMAL, None)],
1954
      [(constants.RS_NORMAL, lname),
1955
       (constants.RS_NORMAL, "exclusive"),
1956
       (constants.RS_NORMAL, [threading.currentThread().getName()])],
1957
      [(constants.RS_NORMAL, lname),
1958
       (constants.RS_NORMAL, "deleted"),
1959
       (constants.RS_NORMAL, None)],
1960
      [(constants.RS_NORMAL, lname),
1961
       (constants.RS_NORMAL, None),
1962
       (constants.RS_NORMAL, None)],
1963
      [(constants.RS_NORMAL, lname),
1964
       (constants.RS_NORMAL, None),
1965
       (constants.RS_NORMAL, None)],
1966
      ]
1967

    
1968
    # Check information order
1969
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1970
    self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
1971

    
1972
    self.assertEqual(len(set(self.lm._locks.values())), len(locks))
1973
    self.assertEqual(len(self.lm._locks), len(locks))
1974

    
1975
    # Check lock deletion
1976
    for idx in range(len(locks)):
1977
      del locks[0]
1978
      assert gc.isenabled()
1979
      gc.collect()
1980
      self.assertEqual(len(self.lm._locks), len(locks))
1981
      result = self.lm.QueryLocks(["name", "mode", "owner"])
1982
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
1983
                       last_status[idx + 1:])
1984

    
1985
    # All locks should have been deleted
1986
    assert not locks
1987
    self.assertFalse(self.lm._locks)
1988

    
1989
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1990
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
1991

    
1992

    
1993
if __name__ == '__main__':
1994
  testutils.GanetiTestProgram()