Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 24d16f76

History | View | Annotate | Download (63.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import itertools
32

    
33
from ganeti import constants
34
from ganeti import locking
35
from ganeti import errors
36
from ganeti import utils
37
from ganeti import compat
38
from ganeti import objects
39
from ganeti import query
40

    
41
import testutils
42

    
43

    
44
# This is used to test the ssynchronize decorator.
45
# Since it's passed as input to a decorator it must be declared as a global.
46
_decoratorlock = locking.SharedLock("decorator lock")
47

    
48
#: List for looping tests
49
ITERATIONS = range(8)
50

    
51

    
52
def _Repeat(fn):
53
  """Decorator for executing a function many times"""
54
  def wrapper(*args, **kwargs):
55
    for i in ITERATIONS:
56
      fn(*args, **kwargs)
57
  return wrapper
58

    
59

    
60
def SafeSleep(duration):
61
  start = time.time()
62
  while True:
63
    delay = start + duration - time.time()
64
    if delay <= 0.0:
65
      break
66
    time.sleep(delay)
67

    
68

    
69
class _ThreadedTestCase(unittest.TestCase):
70
  """Test class that supports adding/waiting on threads"""
71
  def setUp(self):
72
    unittest.TestCase.setUp(self)
73
    self.done = Queue.Queue(0)
74
    self.threads = []
75

    
76
  def _addThread(self, *args, **kwargs):
77
    """Create and remember a new thread"""
78
    t = threading.Thread(*args, **kwargs)
79
    self.threads.append(t)
80
    t.start()
81
    return t
82

    
83
  def _waitThreads(self):
84
    """Wait for all our threads to finish"""
85
    for t in self.threads:
86
      t.join(60)
87
      self.failIf(t.isAlive())
88
    self.threads = []
89

    
90

    
91
class _ConditionTestCase(_ThreadedTestCase):
92
  """Common test case for conditions"""
93

    
94
  def setUp(self, cls):
95
    _ThreadedTestCase.setUp(self)
96
    self.lock = threading.Lock()
97
    self.cond = cls(self.lock)
98

    
99
  def _testAcquireRelease(self):
100
    self.assertFalse(self.cond._is_owned())
101
    self.assertRaises(RuntimeError, self.cond.wait)
102
    self.assertRaises(RuntimeError, self.cond.notifyAll)
103

    
104
    self.cond.acquire()
105
    self.assert_(self.cond._is_owned())
106
    self.cond.notifyAll()
107
    self.assert_(self.cond._is_owned())
108
    self.cond.release()
109

    
110
    self.assertFalse(self.cond._is_owned())
111
    self.assertRaises(RuntimeError, self.cond.wait)
112
    self.assertRaises(RuntimeError, self.cond.notifyAll)
113

    
114
  def _testNotification(self):
115
    def _NotifyAll():
116
      self.done.put("NE")
117
      self.cond.acquire()
118
      self.done.put("NA")
119
      self.cond.notifyAll()
120
      self.done.put("NN")
121
      self.cond.release()
122

    
123
    self.cond.acquire()
124
    self._addThread(target=_NotifyAll)
125
    self.assertEqual(self.done.get(True, 1), "NE")
126
    self.assertRaises(Queue.Empty, self.done.get_nowait)
127
    self.cond.wait()
128
    self.assertEqual(self.done.get(True, 1), "NA")
129
    self.assertEqual(self.done.get(True, 1), "NN")
130
    self.assert_(self.cond._is_owned())
131
    self.cond.release()
132
    self.assertFalse(self.cond._is_owned())
133

    
134

    
135
class TestSingleNotifyPipeCondition(_ConditionTestCase):
136
  """SingleNotifyPipeCondition tests"""
137

    
138
  def setUp(self):
139
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
140

    
141
  def testAcquireRelease(self):
142
    self._testAcquireRelease()
143

    
144
  def testNotification(self):
145
    self._testNotification()
146

    
147
  def testWaitReuse(self):
148
    self.cond.acquire()
149
    self.cond.wait(0)
150
    self.cond.wait(0.1)
151
    self.cond.release()
152

    
153
  def testNoNotifyReuse(self):
154
    self.cond.acquire()
155
    self.cond.notifyAll()
156
    self.assertRaises(RuntimeError, self.cond.wait)
157
    self.assertRaises(RuntimeError, self.cond.notifyAll)
158
    self.cond.release()
159

    
160

    
161
class TestPipeCondition(_ConditionTestCase):
162
  """PipeCondition tests"""
163

    
164
  def setUp(self):
165
    _ConditionTestCase.setUp(self, locking.PipeCondition)
166

    
167
  def testAcquireRelease(self):
168
    self._testAcquireRelease()
169

    
170
  def testNotification(self):
171
    self._testNotification()
172

    
173
  def _TestWait(self, fn):
174
    threads = [
175
      self._addThread(target=fn),
176
      self._addThread(target=fn),
177
      self._addThread(target=fn),
178
      ]
179

    
180
    # Wait for threads to be waiting
181
    for _ in threads:
182
      self.assertEqual(self.done.get(True, 1), "A")
183

    
184
    self.assertRaises(Queue.Empty, self.done.get_nowait)
185

    
186
    self.cond.acquire()
187
    self.assertEqual(len(self.cond._waiters), 3)
188
    self.assertEqual(self.cond._waiters, set(threads))
189
    # This new thread can't acquire the lock, and thus call wait, before we
190
    # release it
191
    self._addThread(target=fn)
192
    self.cond.notifyAll()
193
    self.assertRaises(Queue.Empty, self.done.get_nowait)
194
    self.cond.release()
195

    
196
    # We should now get 3 W and 1 A (for the new thread) in whatever order
197
    w = 0
198
    a = 0
199
    for i in range(4):
200
      got = self.done.get(True, 1)
201
      if got == "W":
202
        w += 1
203
      elif got == "A":
204
        a += 1
205
      else:
206
        self.fail("Got %s on the done queue" % got)
207

    
208
    self.assertEqual(w, 3)
209
    self.assertEqual(a, 1)
210

    
211
    self.cond.acquire()
212
    self.cond.notifyAll()
213
    self.cond.release()
214
    self._waitThreads()
215
    self.assertEqual(self.done.get_nowait(), "W")
216
    self.assertRaises(Queue.Empty, self.done.get_nowait)
217

    
218
  def testBlockingWait(self):
219
    def _BlockingWait():
220
      self.cond.acquire()
221
      self.done.put("A")
222
      self.cond.wait()
223
      self.cond.release()
224
      self.done.put("W")
225

    
226
    self._TestWait(_BlockingWait)
227

    
228
  def testLongTimeoutWait(self):
229
    def _Helper():
230
      self.cond.acquire()
231
      self.done.put("A")
232
      self.cond.wait(15.0)
233
      self.cond.release()
234
      self.done.put("W")
235

    
236
    self._TestWait(_Helper)
237

    
238
  def _TimeoutWait(self, timeout, check):
239
    self.cond.acquire()
240
    self.cond.wait(timeout)
241
    self.cond.release()
242
    self.done.put(check)
243

    
244
  def testShortTimeoutWait(self):
245
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
246
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247
    self._waitThreads()
248
    self.assertEqual(self.done.get_nowait(), "T1")
249
    self.assertEqual(self.done.get_nowait(), "T1")
250
    self.assertRaises(Queue.Empty, self.done.get_nowait)
251

    
252
  def testZeroTimeoutWait(self):
253
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
254
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256
    self._waitThreads()
257
    self.assertEqual(self.done.get_nowait(), "T0")
258
    self.assertEqual(self.done.get_nowait(), "T0")
259
    self.assertEqual(self.done.get_nowait(), "T0")
260
    self.assertRaises(Queue.Empty, self.done.get_nowait)
261

    
262

    
263
class TestSharedLock(_ThreadedTestCase):
264
  """SharedLock tests"""
265

    
266
  def setUp(self):
267
    _ThreadedTestCase.setUp(self)
268
    self.sl = locking.SharedLock("TestSharedLock")
269

    
270
  def testSequenceAndOwnership(self):
271
    self.assertFalse(self.sl._is_owned())
272
    self.sl.acquire(shared=1)
273
    self.assert_(self.sl._is_owned())
274
    self.assert_(self.sl._is_owned(shared=1))
275
    self.assertFalse(self.sl._is_owned(shared=0))
276
    self.sl.release()
277
    self.assertFalse(self.sl._is_owned())
278
    self.sl.acquire()
279
    self.assert_(self.sl._is_owned())
280
    self.assertFalse(self.sl._is_owned(shared=1))
281
    self.assert_(self.sl._is_owned(shared=0))
282
    self.sl.release()
283
    self.assertFalse(self.sl._is_owned())
284
    self.sl.acquire(shared=1)
285
    self.assert_(self.sl._is_owned())
286
    self.assert_(self.sl._is_owned(shared=1))
287
    self.assertFalse(self.sl._is_owned(shared=0))
288
    self.sl.release()
289
    self.assertFalse(self.sl._is_owned())
290

    
291
  def testBooleanValue(self):
292
    # semaphores are supposed to return a true value on a successful acquire
293
    self.assert_(self.sl.acquire(shared=1))
294
    self.sl.release()
295
    self.assert_(self.sl.acquire())
296
    self.sl.release()
297

    
298
  def testDoubleLockingStoE(self):
299
    self.sl.acquire(shared=1)
300
    self.assertRaises(AssertionError, self.sl.acquire)
301

    
302
  def testDoubleLockingEtoS(self):
303
    self.sl.acquire()
304
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
305

    
306
  def testDoubleLockingStoS(self):
307
    self.sl.acquire(shared=1)
308
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
309

    
310
  def testDoubleLockingEtoE(self):
311
    self.sl.acquire()
312
    self.assertRaises(AssertionError, self.sl.acquire)
313

    
314
  # helper functions: called in a separate thread they acquire the lock, send
315
  # their identifier on the done queue, then release it.
316
  def _doItSharer(self):
317
    try:
318
      self.sl.acquire(shared=1)
319
      self.done.put('SHR')
320
      self.sl.release()
321
    except errors.LockError:
322
      self.done.put('ERR')
323

    
324
  def _doItExclusive(self):
325
    try:
326
      self.sl.acquire()
327
      self.done.put('EXC')
328
      self.sl.release()
329
    except errors.LockError:
330
      self.done.put('ERR')
331

    
332
  def _doItDelete(self):
333
    try:
334
      self.sl.delete()
335
      self.done.put('DEL')
336
    except errors.LockError:
337
      self.done.put('ERR')
338

    
339
  def testSharersCanCoexist(self):
340
    self.sl.acquire(shared=1)
341
    threading.Thread(target=self._doItSharer).start()
342
    self.assert_(self.done.get(True, 1))
343
    self.sl.release()
344

    
345
  @_Repeat
346
  def testExclusiveBlocksExclusive(self):
347
    self.sl.acquire()
348
    self._addThread(target=self._doItExclusive)
349
    self.assertRaises(Queue.Empty, self.done.get_nowait)
350
    self.sl.release()
351
    self._waitThreads()
352
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
353

    
354
  @_Repeat
355
  def testExclusiveBlocksDelete(self):
356
    self.sl.acquire()
357
    self._addThread(target=self._doItDelete)
358
    self.assertRaises(Queue.Empty, self.done.get_nowait)
359
    self.sl.release()
360
    self._waitThreads()
361
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
362
    self.sl = locking.SharedLock(self.sl.name)
363

    
364
  @_Repeat
365
  def testExclusiveBlocksSharer(self):
366
    self.sl.acquire()
367
    self._addThread(target=self._doItSharer)
368
    self.assertRaises(Queue.Empty, self.done.get_nowait)
369
    self.sl.release()
370
    self._waitThreads()
371
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
372

    
373
  @_Repeat
374
  def testSharerBlocksExclusive(self):
375
    self.sl.acquire(shared=1)
376
    self._addThread(target=self._doItExclusive)
377
    self.assertRaises(Queue.Empty, self.done.get_nowait)
378
    self.sl.release()
379
    self._waitThreads()
380
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
381

    
382
  @_Repeat
383
  def testSharerBlocksDelete(self):
384
    self.sl.acquire(shared=1)
385
    self._addThread(target=self._doItDelete)
386
    self.assertRaises(Queue.Empty, self.done.get_nowait)
387
    self.sl.release()
388
    self._waitThreads()
389
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
390
    self.sl = locking.SharedLock(self.sl.name)
391

    
392
  @_Repeat
393
  def testWaitingExclusiveBlocksSharer(self):
394
    """SKIPPED testWaitingExclusiveBlockSharer"""
395
    return
396

    
397
    self.sl.acquire(shared=1)
398
    # the lock is acquired in shared mode...
399
    self._addThread(target=self._doItExclusive)
400
    # ...but now an exclusive is waiting...
401
    self._addThread(target=self._doItSharer)
402
    # ...so the sharer should be blocked as well
403
    self.assertRaises(Queue.Empty, self.done.get_nowait)
404
    self.sl.release()
405
    self._waitThreads()
406
    # The exclusive passed before
407
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
408
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
409

    
410
  @_Repeat
411
  def testWaitingSharerBlocksExclusive(self):
412
    """SKIPPED testWaitingSharerBlocksExclusive"""
413
    return
414

    
415
    self.sl.acquire()
416
    # the lock is acquired in exclusive mode...
417
    self._addThread(target=self._doItSharer)
418
    # ...but now a sharer is waiting...
419
    self._addThread(target=self._doItExclusive)
420
    # ...the exclusive is waiting too...
421
    self.assertRaises(Queue.Empty, self.done.get_nowait)
422
    self.sl.release()
423
    self._waitThreads()
424
    # The sharer passed before
425
    self.assertEqual(self.done.get_nowait(), 'SHR')
426
    self.assertEqual(self.done.get_nowait(), 'EXC')
427

    
428
  def testDelete(self):
429
    self.sl.delete()
430
    self.assertRaises(errors.LockError, self.sl.acquire)
431
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
432
    self.assertRaises(errors.LockError, self.sl.delete)
433

    
434
  def testDeleteTimeout(self):
435
    self.sl.delete(timeout=60)
436

    
437
  def testNoDeleteIfSharer(self):
438
    self.sl.acquire(shared=1)
439
    self.assertRaises(AssertionError, self.sl.delete)
440

    
441
  @_Repeat
442
  def testDeletePendingSharersExclusiveDelete(self):
443
    self.sl.acquire()
444
    self._addThread(target=self._doItSharer)
445
    self._addThread(target=self._doItSharer)
446
    self._addThread(target=self._doItExclusive)
447
    self._addThread(target=self._doItDelete)
448
    self.sl.delete()
449
    self._waitThreads()
450
    # The threads who were pending return ERR
451
    for _ in range(4):
452
      self.assertEqual(self.done.get_nowait(), 'ERR')
453
    self.sl = locking.SharedLock(self.sl.name)
454

    
455
  @_Repeat
456
  def testDeletePendingDeleteExclusiveSharers(self):
457
    self.sl.acquire()
458
    self._addThread(target=self._doItDelete)
459
    self._addThread(target=self._doItExclusive)
460
    self._addThread(target=self._doItSharer)
461
    self._addThread(target=self._doItSharer)
462
    self.sl.delete()
463
    self._waitThreads()
464
    # The two threads who were pending return both ERR
465
    self.assertEqual(self.done.get_nowait(), 'ERR')
466
    self.assertEqual(self.done.get_nowait(), 'ERR')
467
    self.assertEqual(self.done.get_nowait(), 'ERR')
468
    self.assertEqual(self.done.get_nowait(), 'ERR')
469
    self.sl = locking.SharedLock(self.sl.name)
470

    
471
  @_Repeat
472
  def testExclusiveAcquireTimeout(self):
473
    for shared in [0, 1]:
474
      on_queue = threading.Event()
475
      release_exclusive = threading.Event()
476

    
477
      def _LockExclusive():
478
        self.sl.acquire(shared=0, test_notify=on_queue.set)
479
        self.done.put("A: start wait")
480
        release_exclusive.wait()
481
        self.done.put("A: end wait")
482
        self.sl.release()
483

    
484
      # Start thread to hold lock in exclusive mode
485
      self._addThread(target=_LockExclusive)
486

    
487
      # Wait for wait to begin
488
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
489

    
490
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
491
      # on the queue
492
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
493
                                      test_notify=release_exclusive.set))
494

    
495
      self.done.put("got 2nd")
496
      self.sl.release()
497

    
498
      self._waitThreads()
499

    
500
      self.assertEqual(self.done.get_nowait(), "A: end wait")
501
      self.assertEqual(self.done.get_nowait(), "got 2nd")
502
      self.assertRaises(Queue.Empty, self.done.get_nowait)
503

    
504
  @_Repeat
505
  def testAcquireExpiringTimeout(self):
506
    def _AcquireWithTimeout(shared, timeout):
507
      if not self.sl.acquire(shared=shared, timeout=timeout):
508
        self.done.put("timeout")
509

    
510
    for shared in [0, 1]:
511
      # Lock exclusively
512
      self.sl.acquire()
513

    
514
      # Start shared acquires with timeout between 0 and 20 ms
515
      for i in range(11):
516
        self._addThread(target=_AcquireWithTimeout,
517
                        args=(shared, i * 2.0 / 1000.0))
518

    
519
      # Wait for threads to finish (makes sure the acquire timeout expires
520
      # before releasing the lock)
521
      self._waitThreads()
522

    
523
      # Release lock
524
      self.sl.release()
525

    
526
      for _ in range(11):
527
        self.assertEqual(self.done.get_nowait(), "timeout")
528

    
529
      self.assertRaises(Queue.Empty, self.done.get_nowait)
530

    
531
  @_Repeat
532
  def testSharedSkipExclusiveAcquires(self):
533
    # Tests whether shared acquires jump in front of exclusive acquires in the
534
    # queue.
535

    
536
    def _Acquire(shared, name, notify_ev, wait_ev):
537
      if notify_ev:
538
        notify_fn = notify_ev.set
539
      else:
540
        notify_fn = None
541

    
542
      if wait_ev:
543
        wait_ev.wait()
544

    
545
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
546
        return
547

    
548
      self.done.put(name)
549
      self.sl.release()
550

    
551
    # Get exclusive lock while we fill the queue
552
    self.sl.acquire()
553

    
554
    shrcnt1 = 5
555
    shrcnt2 = 7
556
    shrcnt3 = 9
557
    shrcnt4 = 2
558

    
559
    # Add acquires using threading.Event for synchronization. They'll be
560
    # acquired exactly in the order defined in this list.
561
    acquires = (shrcnt1 * [(1, "shared 1")] +
562
                3 * [(0, "exclusive 1")] +
563
                shrcnt2 * [(1, "shared 2")] +
564
                shrcnt3 * [(1, "shared 3")] +
565
                shrcnt4 * [(1, "shared 4")] +
566
                3 * [(0, "exclusive 2")])
567

    
568
    ev_cur = None
569
    ev_prev = None
570

    
571
    for args in acquires:
572
      ev_cur = threading.Event()
573
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
574
      ev_prev = ev_cur
575

    
576
    # Wait for last acquire to start
577
    ev_prev.wait()
578

    
579
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
580
    # together
581
    self.assertEqual(self.sl._count_pending(), 7)
582

    
583
    # Release exclusive lock and wait
584
    self.sl.release()
585

    
586
    self._waitThreads()
587

    
588
    # Check sequence
589
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
590
      # Shared locks aren't guaranteed to be notified in order, but they'll be
591
      # first
592
      tmp = self.done.get_nowait()
593
      if tmp == "shared 1":
594
        shrcnt1 -= 1
595
      elif tmp == "shared 2":
596
        shrcnt2 -= 1
597
      elif tmp == "shared 3":
598
        shrcnt3 -= 1
599
      elif tmp == "shared 4":
600
        shrcnt4 -= 1
601
    self.assertEqual(shrcnt1, 0)
602
    self.assertEqual(shrcnt2, 0)
603
    self.assertEqual(shrcnt3, 0)
604
    self.assertEqual(shrcnt3, 0)
605

    
606
    for _ in range(3):
607
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
608

    
609
    for _ in range(3):
610
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
611

    
612
    self.assertRaises(Queue.Empty, self.done.get_nowait)
613

    
614
  @_Repeat
615
  def testMixedAcquireTimeout(self):
616
    sync = threading.Event()
617

    
618
    def _AcquireShared(ev):
619
      if not self.sl.acquire(shared=1, timeout=None):
620
        return
621

    
622
      self.done.put("shared")
623

    
624
      # Notify main thread
625
      ev.set()
626

    
627
      # Wait for notification from main thread
628
      sync.wait()
629

    
630
      # Release lock
631
      self.sl.release()
632

    
633
    acquires = []
634
    for _ in range(3):
635
      ev = threading.Event()
636
      self._addThread(target=_AcquireShared, args=(ev, ))
637
      acquires.append(ev)
638

    
639
    # Wait for all acquires to finish
640
    for i in acquires:
641
      i.wait()
642

    
643
    self.assertEqual(self.sl._count_pending(), 0)
644

    
645
    # Try to get exclusive lock
646
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
647

    
648
    # Acquire exclusive without timeout
649
    exclsync = threading.Event()
650
    exclev = threading.Event()
651

    
652
    def _AcquireExclusive():
653
      if not self.sl.acquire(shared=0):
654
        return
655

    
656
      self.done.put("exclusive")
657

    
658
      # Notify main thread
659
      exclev.set()
660

    
661
      # Wait for notification from main thread
662
      exclsync.wait()
663

    
664
      self.sl.release()
665

    
666
    self._addThread(target=_AcquireExclusive)
667

    
668
    # Try to get exclusive lock
669
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
670

    
671
    # Make all shared holders release their locks
672
    sync.set()
673

    
674
    # Wait for exclusive acquire to succeed
675
    exclev.wait()
676

    
677
    self.assertEqual(self.sl._count_pending(), 0)
678

    
679
    # Try to get exclusive lock
680
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
681

    
682
    def _AcquireSharedSimple():
683
      if self.sl.acquire(shared=1, timeout=None):
684
        self.done.put("shared2")
685
        self.sl.release()
686

    
687
    for _ in range(10):
688
      self._addThread(target=_AcquireSharedSimple)
689

    
690
    # Tell exclusive lock to release
691
    exclsync.set()
692

    
693
    # Wait for everything to finish
694
    self._waitThreads()
695

    
696
    self.assertEqual(self.sl._count_pending(), 0)
697

    
698
    # Check sequence
699
    for _ in range(3):
700
      self.assertEqual(self.done.get_nowait(), "shared")
701

    
702
    self.assertEqual(self.done.get_nowait(), "exclusive")
703

    
704
    for _ in range(10):
705
      self.assertEqual(self.done.get_nowait(), "shared2")
706

    
707
    self.assertRaises(Queue.Empty, self.done.get_nowait)
708

    
709
  def testPriority(self):
710
    # Acquire in exclusive mode
711
    self.assert_(self.sl.acquire(shared=0))
712

    
713
    # Queue acquires
714
    def _Acquire(prev, next, shared, priority, result):
715
      prev.wait()
716
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
717
      try:
718
        self.done.put(result)
719
      finally:
720
        self.sl.release()
721

    
722
    counter = itertools.count(0)
723
    priorities = range(-20, 30)
724
    first = threading.Event()
725
    prev = first
726

    
727
    # Data structure:
728
    # {
729
    #   priority:
730
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
731
    #      (shared/exclusive, ...),
732
    #      ...,
733
    #     ],
734
    # }
735
    perprio = {}
736

    
737
    # References shared acquire per priority in L{perprio}. Data structure:
738
    # {
739
    #   priority: (shared=1, set(acquire names), set(pending threads)),
740
    # }
741
    prioshared = {}
742

    
743
    for seed in [4979, 9523, 14902, 32440]:
744
      # Use a deterministic random generator
745
      rnd = random.Random(seed)
746
      for priority in [rnd.choice(priorities) for _ in range(30)]:
747
        modes = [0, 1]
748
        rnd.shuffle(modes)
749
        for shared in modes:
750
          # Unique name
751
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
752

    
753
          ev = threading.Event()
754
          thread = self._addThread(target=_Acquire,
755
                                   args=(prev, ev, shared, priority, acqname))
756
          prev = ev
757

    
758
          # Record expected aqcuire, see above for structure
759
          data = (shared, set([acqname]), set([thread]))
760
          priolist = perprio.setdefault(priority, [])
761
          if shared:
762
            priosh = prioshared.get(priority, None)
763
            if priosh:
764
              # Shared acquires are merged
765
              for i, j in zip(priosh[1:], data[1:]):
766
                i.update(j)
767
              assert data[0] == priosh[0]
768
            else:
769
              prioshared[priority] = data
770
              priolist.append(data)
771
          else:
772
            priolist.append(data)
773

    
774
    # Start all acquires and wait for them
775
    first.set()
776
    prev.wait()
777

    
778
    # Check lock information
779
    self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
780
    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
781
                     (self.sl.name, "exclusive",
782
                      [threading.currentThread().getName()], None))
783

    
784
    self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
785

    
786
    # Let threads acquire the lock
787
    self.sl.release()
788

    
789
    # Wait for everything to finish
790
    self._waitThreads()
791

    
792
    self.assert_(self.sl._check_empty())
793

    
794
    # Check acquires by priority
795
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
796
      for (_, names, _) in acquires:
797
        # For shared acquires, the set will contain 1..n entries. For exclusive
798
        # acquires only one.
799
        while names:
800
          names.remove(self.done.get_nowait())
801
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
802

    
803
    self.assertRaises(Queue.Empty, self.done.get_nowait)
804

    
805
  def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
806
    self.assertEqual(name, self.sl.name)
807
    self.assert_(mode is None)
808
    self.assert_(owner is None)
809

    
810
    self.assertEqual([(pendmode, sorted(waiting))
811
                      for (pendmode, waiting) in pending],
812
                     [(["exclusive", "shared"][int(bool(shared))],
813
                       sorted(t.getName() for t in threads))
814
                      for acquires in [perprio[i]
815
                                       for i in sorted(perprio.keys())]
816
                      for (shared, _, threads) in acquires])
817

    
818

    
819
class TestSharedLockInCondition(_ThreadedTestCase):
820
  """SharedLock as a condition lock tests"""
821

    
822
  def setUp(self):
823
    _ThreadedTestCase.setUp(self)
824
    self.sl = locking.SharedLock("TestSharedLockInCondition")
825
    self.setCondition()
826

    
827
  def setCondition(self):
828
    self.cond = threading.Condition(self.sl)
829

    
830
  def testKeepMode(self):
831
    self.cond.acquire(shared=1)
832
    self.assert_(self.sl._is_owned(shared=1))
833
    self.cond.wait(0)
834
    self.assert_(self.sl._is_owned(shared=1))
835
    self.cond.release()
836
    self.cond.acquire(shared=0)
837
    self.assert_(self.sl._is_owned(shared=0))
838
    self.cond.wait(0)
839
    self.assert_(self.sl._is_owned(shared=0))
840
    self.cond.release()
841

    
842

    
843
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
844
  """SharedLock as a pipe condition lock tests"""
845

    
846
  def setCondition(self):
847
    self.cond = locking.PipeCondition(self.sl)
848

    
849

    
850
class TestSSynchronizedDecorator(_ThreadedTestCase):
851
  """Shared Lock Synchronized decorator test"""
852

    
853
  def setUp(self):
854
    _ThreadedTestCase.setUp(self)
855

    
856
  @locking.ssynchronized(_decoratorlock)
857
  def _doItExclusive(self):
858
    self.assert_(_decoratorlock._is_owned())
859
    self.done.put('EXC')
860

    
861
  @locking.ssynchronized(_decoratorlock, shared=1)
862
  def _doItSharer(self):
863
    self.assert_(_decoratorlock._is_owned(shared=1))
864
    self.done.put('SHR')
865

    
866
  def testDecoratedFunctions(self):
867
    self._doItExclusive()
868
    self.assertFalse(_decoratorlock._is_owned())
869
    self._doItSharer()
870
    self.assertFalse(_decoratorlock._is_owned())
871

    
872
  def testSharersCanCoexist(self):
873
    _decoratorlock.acquire(shared=1)
874
    threading.Thread(target=self._doItSharer).start()
875
    self.assert_(self.done.get(True, 1))
876
    _decoratorlock.release()
877

    
878
  @_Repeat
879
  def testExclusiveBlocksExclusive(self):
880
    _decoratorlock.acquire()
881
    self._addThread(target=self._doItExclusive)
882
    # give it a bit of time to check that it's not actually doing anything
883
    self.assertRaises(Queue.Empty, self.done.get_nowait)
884
    _decoratorlock.release()
885
    self._waitThreads()
886
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
887

    
888
  @_Repeat
889
  def testExclusiveBlocksSharer(self):
890
    _decoratorlock.acquire()
891
    self._addThread(target=self._doItSharer)
892
    self.assertRaises(Queue.Empty, self.done.get_nowait)
893
    _decoratorlock.release()
894
    self._waitThreads()
895
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
896

    
897
  @_Repeat
898
  def testSharerBlocksExclusive(self):
899
    _decoratorlock.acquire(shared=1)
900
    self._addThread(target=self._doItExclusive)
901
    self.assertRaises(Queue.Empty, self.done.get_nowait)
902
    _decoratorlock.release()
903
    self._waitThreads()
904
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
905

    
906

    
907
class TestLockSet(_ThreadedTestCase):
908
  """LockSet tests"""
909

    
910
  def setUp(self):
911
    _ThreadedTestCase.setUp(self)
912
    self._setUpLS()
913

    
914
  def _setUpLS(self):
915
    """Helper to (re)initialize the lock set"""
916
    self.resources = ['one', 'two', 'three']
917
    self.ls = locking.LockSet(self.resources, "TestLockSet")
918

    
919
  def testResources(self):
920
    self.assertEquals(self.ls._names(), set(self.resources))
921
    newls = locking.LockSet([], "TestLockSet.testResources")
922
    self.assertEquals(newls._names(), set())
923

    
924
  def testAcquireRelease(self):
925
    self.assert_(self.ls.acquire('one'))
926
    self.assertEquals(self.ls._list_owned(), set(['one']))
927
    self.ls.release()
928
    self.assertEquals(self.ls._list_owned(), set())
929
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
930
    self.assertEquals(self.ls._list_owned(), set(['one']))
931
    self.ls.release()
932
    self.assertEquals(self.ls._list_owned(), set())
933
    self.ls.acquire(['one', 'two', 'three'])
934
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
935
    self.ls.release('one')
936
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
937
    self.ls.release(['three'])
938
    self.assertEquals(self.ls._list_owned(), set(['two']))
939
    self.ls.release()
940
    self.assertEquals(self.ls._list_owned(), set())
941
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
942
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
943
    self.ls.release()
944
    self.assertEquals(self.ls._list_owned(), set())
945

    
946
  def testNoDoubleAcquire(self):
947
    self.ls.acquire('one')
948
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
949
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
950
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
951
    self.ls.release()
952
    self.ls.acquire(['one', 'three'])
953
    self.ls.release('one')
954
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
955
    self.ls.release('three')
956

    
957
  def testNoWrongRelease(self):
958
    self.assertRaises(AssertionError, self.ls.release)
959
    self.ls.acquire('one')
960
    self.assertRaises(AssertionError, self.ls.release, 'two')
961

    
962
  def testAddRemove(self):
963
    self.ls.add('four')
964
    self.assertEquals(self.ls._list_owned(), set())
965
    self.assert_('four' in self.ls._names())
966
    self.ls.add(['five', 'six', 'seven'], acquired=1)
967
    self.assert_('five' in self.ls._names())
968
    self.assert_('six' in self.ls._names())
969
    self.assert_('seven' in self.ls._names())
970
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
971
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
972
    self.assert_('five' not in self.ls._names())
973
    self.assert_('six' not in self.ls._names())
974
    self.assertEquals(self.ls._list_owned(), set(['seven']))
975
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
976
    self.ls.remove('seven')
977
    self.assert_('seven' not in self.ls._names())
978
    self.assertEquals(self.ls._list_owned(), set([]))
979
    self.ls.acquire(None, shared=1)
980
    self.assertRaises(AssertionError, self.ls.add, 'eight')
981
    self.ls.release()
982
    self.ls.acquire(None)
983
    self.ls.add('eight', acquired=1)
984
    self.assert_('eight' in self.ls._names())
985
    self.assert_('eight' in self.ls._list_owned())
986
    self.ls.add('nine')
987
    self.assert_('nine' in self.ls._names())
988
    self.assert_('nine' not in self.ls._list_owned())
989
    self.ls.release()
990
    self.ls.remove(['two'])
991
    self.assert_('two' not in self.ls._names())
992
    self.ls.acquire('three')
993
    self.assertEquals(self.ls.remove(['three']), ['three'])
994
    self.assert_('three' not in self.ls._names())
995
    self.assertEquals(self.ls.remove('three'), [])
996
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
997
    self.assert_('one' not in self.ls._names())
998

    
999
  def testRemoveNonBlocking(self):
1000
    self.ls.acquire('one')
1001
    self.assertEquals(self.ls.remove('one'), ['one'])
1002
    self.ls.acquire(['two', 'three'])
1003
    self.assertEquals(self.ls.remove(['two', 'three']),
1004
                      ['two', 'three'])
1005

    
1006
  def testNoDoubleAdd(self):
1007
    self.assertRaises(errors.LockError, self.ls.add, 'two')
1008
    self.ls.add('four')
1009
    self.assertRaises(errors.LockError, self.ls.add, 'four')
1010

    
1011
  def testNoWrongRemoves(self):
1012
    self.ls.acquire(['one', 'three'], shared=1)
1013
    # Cannot remove 'two' while holding something which is not a superset
1014
    self.assertRaises(AssertionError, self.ls.remove, 'two')
1015
    # Cannot remove 'three' as we are sharing it
1016
    self.assertRaises(AssertionError, self.ls.remove, 'three')
1017

    
1018
  def testAcquireSetLock(self):
1019
    # acquire the set-lock exclusively
1020
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1021
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1022
    self.assertEquals(self.ls._is_owned(), True)
1023
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1024
    # I can still add/remove elements...
1025
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1026
    self.assert_(self.ls.add('six'))
1027
    self.ls.release()
1028
    # share the set-lock
1029
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1030
    # adding new elements is not possible
1031
    self.assertRaises(AssertionError, self.ls.add, 'five')
1032
    self.ls.release()
1033

    
1034
  def testAcquireWithRepetitions(self):
1035
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1036
                      set(['two', 'two', 'three']))
1037
    self.ls.release(['two', 'two'])
1038
    self.assertEquals(self.ls._list_owned(), set(['three']))
1039

    
1040
  def testEmptyAcquire(self):
1041
    # Acquire an empty list of locks...
1042
    self.assertEquals(self.ls.acquire([]), set())
1043
    self.assertEquals(self.ls._list_owned(), set())
1044
    # New locks can still be addded
1045
    self.assert_(self.ls.add('six'))
1046
    # "re-acquiring" is not an issue, since we had really acquired nothing
1047
    self.assertEquals(self.ls.acquire([], shared=1), set())
1048
    self.assertEquals(self.ls._list_owned(), set())
1049
    # We haven't really acquired anything, so we cannot release
1050
    self.assertRaises(AssertionError, self.ls.release)
1051

    
1052
  def _doLockSet(self, names, shared):
1053
    try:
1054
      self.ls.acquire(names, shared=shared)
1055
      self.done.put('DONE')
1056
      self.ls.release()
1057
    except errors.LockError:
1058
      self.done.put('ERR')
1059

    
1060
  def _doAddSet(self, names):
1061
    try:
1062
      self.ls.add(names, acquired=1)
1063
      self.done.put('DONE')
1064
      self.ls.release()
1065
    except errors.LockError:
1066
      self.done.put('ERR')
1067

    
1068
  def _doRemoveSet(self, names):
1069
    self.done.put(self.ls.remove(names))
1070

    
1071
  @_Repeat
1072
  def testConcurrentSharedAcquire(self):
1073
    self.ls.acquire(['one', 'two'], shared=1)
1074
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1075
    self._waitThreads()
1076
    self.assertEqual(self.done.get_nowait(), 'DONE')
1077
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1078
    self._waitThreads()
1079
    self.assertEqual(self.done.get_nowait(), 'DONE')
1080
    self._addThread(target=self._doLockSet, args=('three', 1))
1081
    self._waitThreads()
1082
    self.assertEqual(self.done.get_nowait(), 'DONE')
1083
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1084
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1085
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1086
    self.ls.release()
1087
    self._waitThreads()
1088
    self.assertEqual(self.done.get_nowait(), 'DONE')
1089
    self.assertEqual(self.done.get_nowait(), 'DONE')
1090

    
1091
  @_Repeat
1092
  def testConcurrentExclusiveAcquire(self):
1093
    self.ls.acquire(['one', 'two'])
1094
    self._addThread(target=self._doLockSet, args=('three', 1))
1095
    self._waitThreads()
1096
    self.assertEqual(self.done.get_nowait(), 'DONE')
1097
    self._addThread(target=self._doLockSet, args=('three', 0))
1098
    self._waitThreads()
1099
    self.assertEqual(self.done.get_nowait(), 'DONE')
1100
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1101
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1102
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1103
    self._addThread(target=self._doLockSet, args=('one', 0))
1104
    self._addThread(target=self._doLockSet, args=('one', 1))
1105
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1106
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1107
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1108
    self.ls.release()
1109
    self._waitThreads()
1110
    for _ in range(6):
1111
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1112

    
1113
  @_Repeat
1114
  def testSimpleAcquireTimeoutExpiring(self):
1115
    names = sorted(self.ls._names())
1116
    self.assert_(len(names) >= 3)
1117

    
1118
    # Get name of first lock
1119
    first = names[0]
1120

    
1121
    # Get name of last lock
1122
    last = names.pop()
1123

    
1124
    checks = [
1125
      # Block first and try to lock it again
1126
      (first, first),
1127

    
1128
      # Block last and try to lock all locks
1129
      (None, first),
1130

    
1131
      # Block last and try to lock it again
1132
      (last, last),
1133
      ]
1134

    
1135
    for (wanted, block) in checks:
1136
      # Lock in exclusive mode
1137
      self.assert_(self.ls.acquire(block, shared=0))
1138

    
1139
      def _AcquireOne():
1140
        # Try to get the same lock again with a timeout (should never succeed)
1141
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1142
        if acquired:
1143
          self.done.put("acquired")
1144
          self.ls.release()
1145
        else:
1146
          self.assert_(acquired is None)
1147
          self.assertFalse(self.ls._list_owned())
1148
          self.assertFalse(self.ls._is_owned())
1149
          self.done.put("not acquired")
1150

    
1151
      self._addThread(target=_AcquireOne)
1152

    
1153
      # Wait for timeout in thread to expire
1154
      self._waitThreads()
1155

    
1156
      # Release exclusive lock again
1157
      self.ls.release()
1158

    
1159
      self.assertEqual(self.done.get_nowait(), "not acquired")
1160
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1161

    
1162
  @_Repeat
1163
  def testDelayedAndExpiringLockAcquire(self):
1164
    self._setUpLS()
1165
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1166

    
1167
    for expire in (False, True):
1168
      names = sorted(self.ls._names())
1169
      self.assertEqual(len(names), 8)
1170

    
1171
      lock_ev = dict([(i, threading.Event()) for i in names])
1172

    
1173
      # Lock all in exclusive mode
1174
      self.assert_(self.ls.acquire(names, shared=0))
1175

    
1176
      if expire:
1177
        # We'll wait at least 300ms per lock
1178
        lockwait = len(names) * [0.3]
1179

    
1180
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1181
        # this gives us up to 2.4s to fail.
1182
        lockall_timeout = 0.4
1183
      else:
1184
        # This should finish rather quickly
1185
        lockwait = None
1186
        lockall_timeout = len(names) * 5.0
1187

    
1188
      def _LockAll():
1189
        def acquire_notification(name):
1190
          if not expire:
1191
            self.done.put("getting %s" % name)
1192

    
1193
          # Kick next lock
1194
          lock_ev[name].set()
1195

    
1196
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1197
                           test_notify=acquire_notification):
1198
          self.done.put("got all")
1199
          self.ls.release()
1200
        else:
1201
          self.done.put("timeout on all")
1202

    
1203
        # Notify all locks
1204
        for ev in lock_ev.values():
1205
          ev.set()
1206

    
1207
      t = self._addThread(target=_LockAll)
1208

    
1209
      for idx, name in enumerate(names):
1210
        # Wait for actual acquire on this lock to start
1211
        lock_ev[name].wait(10.0)
1212

    
1213
        if expire and t.isAlive():
1214
          # Wait some time after getting the notification to make sure the lock
1215
          # acquire will expire
1216
          SafeSleep(lockwait[idx])
1217

    
1218
        self.ls.release(names=name)
1219

    
1220
      self.assertFalse(self.ls._list_owned())
1221

    
1222
      self._waitThreads()
1223

    
1224
      if expire:
1225
        # Not checking which locks were actually acquired. Doing so would be
1226
        # too timing-dependant.
1227
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1228
      else:
1229
        for i in names:
1230
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1231
        self.assertEqual(self.done.get_nowait(), "got all")
1232
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1233

    
1234
  @_Repeat
1235
  def testConcurrentRemove(self):
1236
    self.ls.add('four')
1237
    self.ls.acquire(['one', 'two', 'four'])
1238
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1239
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1240
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1241
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1242
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1243
    self.ls.remove('one')
1244
    self.ls.release()
1245
    self._waitThreads()
1246
    for i in range(4):
1247
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1248
    self.ls.add(['five', 'six'], acquired=1)
1249
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1250
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1251
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1252
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1253
    self.ls.remove('five')
1254
    self.ls.release()
1255
    self._waitThreads()
1256
    for i in range(4):
1257
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1258
    self.ls.acquire(['three', 'four'])
1259
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1260
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1261
    self.ls.remove('four')
1262
    self._waitThreads()
1263
    self.assertEqual(self.done.get_nowait(), ['six'])
1264
    self._addThread(target=self._doRemoveSet, args=(['two']))
1265
    self._waitThreads()
1266
    self.assertEqual(self.done.get_nowait(), ['two'])
1267
    self.ls.release()
1268
    # reset lockset
1269
    self._setUpLS()
1270

    
1271
  @_Repeat
1272
  def testConcurrentSharedSetLock(self):
1273
    # share the set-lock...
1274
    self.ls.acquire(None, shared=1)
1275
    # ...another thread can share it too
1276
    self._addThread(target=self._doLockSet, args=(None, 1))
1277
    self._waitThreads()
1278
    self.assertEqual(self.done.get_nowait(), 'DONE')
1279
    # ...or just share some elements
1280
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1281
    self._waitThreads()
1282
    self.assertEqual(self.done.get_nowait(), 'DONE')
1283
    # ...but not add new ones or remove any
1284
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1285
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1286
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1287
    # this just releases the set-lock
1288
    self.ls.release([])
1289
    t.join(60)
1290
    self.assertEqual(self.done.get_nowait(), 'DONE')
1291
    # release the lock on the actual elements so remove() can proceed too
1292
    self.ls.release()
1293
    self._waitThreads()
1294
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1295
    # reset lockset
1296
    self._setUpLS()
1297

    
1298
  @_Repeat
1299
  def testConcurrentExclusiveSetLock(self):
1300
    # acquire the set-lock...
1301
    self.ls.acquire(None, shared=0)
1302
    # ...no one can do anything else
1303
    self._addThread(target=self._doLockSet, args=(None, 1))
1304
    self._addThread(target=self._doLockSet, args=(None, 0))
1305
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1306
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1307
    self._addThread(target=self._doAddSet, args=(['nine']))
1308
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1309
    self.ls.release()
1310
    self._waitThreads()
1311
    for _ in range(5):
1312
      self.assertEqual(self.done.get(True, 1), 'DONE')
1313
    # cleanup
1314
    self._setUpLS()
1315

    
1316
  @_Repeat
1317
  def testConcurrentSetLockAdd(self):
1318
    self.ls.acquire('one')
1319
    # Another thread wants the whole SetLock
1320
    self._addThread(target=self._doLockSet, args=(None, 0))
1321
    self._addThread(target=self._doLockSet, args=(None, 1))
1322
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1323
    self.assertRaises(AssertionError, self.ls.add, 'four')
1324
    self.ls.release()
1325
    self._waitThreads()
1326
    self.assertEqual(self.done.get_nowait(), 'DONE')
1327
    self.assertEqual(self.done.get_nowait(), 'DONE')
1328
    self.ls.acquire(None)
1329
    self._addThread(target=self._doLockSet, args=(None, 0))
1330
    self._addThread(target=self._doLockSet, args=(None, 1))
1331
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1332
    self.ls.add('four')
1333
    self.ls.add('five', acquired=1)
1334
    self.ls.add('six', acquired=1, shared=1)
1335
    self.assertEquals(self.ls._list_owned(),
1336
      set(['one', 'two', 'three', 'five', 'six']))
1337
    self.assertEquals(self.ls._is_owned(), True)
1338
    self.assertEquals(self.ls._names(),
1339
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1340
    self.ls.release()
1341
    self._waitThreads()
1342
    self.assertEqual(self.done.get_nowait(), 'DONE')
1343
    self.assertEqual(self.done.get_nowait(), 'DONE')
1344
    self._setUpLS()
1345

    
1346
  @_Repeat
1347
  def testEmptyLockSet(self):
1348
    # get the set-lock
1349
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1350
    # now empty it...
1351
    self.ls.remove(['one', 'two', 'three'])
1352
    # and adds/locks by another thread still wait
1353
    self._addThread(target=self._doAddSet, args=(['nine']))
1354
    self._addThread(target=self._doLockSet, args=(None, 1))
1355
    self._addThread(target=self._doLockSet, args=(None, 0))
1356
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1357
    self.ls.release()
1358
    self._waitThreads()
1359
    for _ in range(3):
1360
      self.assertEqual(self.done.get_nowait(), 'DONE')
1361
    # empty it again...
1362
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1363
    # now share it...
1364
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1365
    # other sharers can go, adds still wait
1366
    self._addThread(target=self._doLockSet, args=(None, 1))
1367
    self._waitThreads()
1368
    self.assertEqual(self.done.get_nowait(), 'DONE')
1369
    self._addThread(target=self._doAddSet, args=(['nine']))
1370
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1371
    self.ls.release()
1372
    self._waitThreads()
1373
    self.assertEqual(self.done.get_nowait(), 'DONE')
1374
    self._setUpLS()
1375

    
1376
  def testPriority(self):
1377
    def _Acquire(prev, next, name, priority, success_fn):
1378
      prev.wait()
1379
      self.assert_(self.ls.acquire(name, shared=0,
1380
                                   priority=priority,
1381
                                   test_notify=lambda _: next.set()))
1382
      try:
1383
        success_fn()
1384
      finally:
1385
        self.ls.release()
1386

    
1387
    # Get all in exclusive mode
1388
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1389

    
1390
    done_two = Queue.Queue(0)
1391

    
1392
    first = threading.Event()
1393
    prev = first
1394

    
1395
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1396
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1397

    
1398
    # Use a deterministic random generator
1399
    random.Random(741).shuffle(acquires)
1400

    
1401
    for (name, prio, done) in acquires:
1402
      ev = threading.Event()
1403
      self._addThread(target=_Acquire,
1404
                      args=(prev, ev, name, prio,
1405
                            compat.partial(done.put, "Prio%s" % prio)))
1406
      prev = ev
1407

    
1408
    # Start acquires
1409
    first.set()
1410

    
1411
    # Wait for last acquire to start
1412
    prev.wait()
1413

    
1414
    # Let threads acquire locks
1415
    self.ls.release()
1416

    
1417
    # Wait for threads to finish
1418
    self._waitThreads()
1419

    
1420
    for i in range(1, 33):
1421
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1422
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1423

    
1424
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1425
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1426

    
1427

    
1428
class TestGanetiLockManager(_ThreadedTestCase):
1429

    
1430
  def setUp(self):
1431
    _ThreadedTestCase.setUp(self)
1432
    self.nodes=['n1', 'n2']
1433
    self.nodegroups=['g1', 'g2']
1434
    self.instances=['i1', 'i2', 'i3']
1435
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1436
                                        self.instances)
1437

    
1438
  def tearDown(self):
1439
    # Don't try this at home...
1440
    locking.GanetiLockManager._instance = None
1441

    
1442
  def testLockingConstants(self):
1443
    # The locking library internally cheats by assuming its constants have some
1444
    # relationships with each other. Check those hold true.
1445
    # This relationship is also used in the Processor to recursively acquire
1446
    # the right locks. Again, please don't break it.
1447
    for i in range(len(locking.LEVELS)):
1448
      self.assertEqual(i, locking.LEVELS[i])
1449

    
1450
  def testDoubleGLFails(self):
1451
    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1452

    
1453
  def testLockNames(self):
1454
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1455
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1456
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1457
                     set(self.nodegroups))
1458
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1459
                     set(self.instances))
1460

    
1461
  def testInitAndResources(self):
1462
    locking.GanetiLockManager._instance = None
1463
    self.GL = locking.GanetiLockManager([], [], [])
1464
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1465
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1466
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1467
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1468

    
1469
    locking.GanetiLockManager._instance = None
1470
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1471
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1472
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1473
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1474
                                    set(self.nodegroups))
1475
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1476

    
1477
    locking.GanetiLockManager._instance = None
1478
    self.GL = locking.GanetiLockManager([], [], self.instances)
1479
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1480
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1481
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1482
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1483
                     set(self.instances))
1484

    
1485
  def testAcquireRelease(self):
1486
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1487
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1488
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1489
    self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1490
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1491
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1492
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1493
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1494
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1495
    self.GL.release(locking.LEVEL_NODE)
1496
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1497
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1498
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1499
    self.GL.release(locking.LEVEL_NODEGROUP)
1500
    self.GL.release(locking.LEVEL_INSTANCE)
1501
    self.assertRaises(errors.LockError, self.GL.acquire,
1502
                      locking.LEVEL_INSTANCE, ['i5'])
1503
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1504
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1505

    
1506
  def testAcquireWholeSets(self):
1507
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1508
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1509
                      set(self.instances))
1510
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1511
                      set(self.instances))
1512
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1513
                      set(self.nodegroups))
1514
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1515
                      set(self.nodegroups))
1516
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1517
                      set(self.nodes))
1518
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1519
                      set(self.nodes))
1520
    self.GL.release(locking.LEVEL_NODE)
1521
    self.GL.release(locking.LEVEL_NODEGROUP)
1522
    self.GL.release(locking.LEVEL_INSTANCE)
1523
    self.GL.release(locking.LEVEL_CLUSTER)
1524

    
1525
  def testAcquireWholeAndPartial(self):
1526
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1527
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1528
                      set(self.instances))
1529
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1530
                      set(self.instances))
1531
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1532
                      set(['n2']))
1533
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1534
                      set(['n2']))
1535
    self.GL.release(locking.LEVEL_NODE)
1536
    self.GL.release(locking.LEVEL_INSTANCE)
1537
    self.GL.release(locking.LEVEL_CLUSTER)
1538

    
1539
  def testBGLDependency(self):
1540
    self.assertRaises(AssertionError, self.GL.acquire,
1541
                      locking.LEVEL_NODE, ['n1', 'n2'])
1542
    self.assertRaises(AssertionError, self.GL.acquire,
1543
                      locking.LEVEL_INSTANCE, ['i3'])
1544
    self.assertRaises(AssertionError, self.GL.acquire,
1545
                      locking.LEVEL_NODEGROUP, ['g1'])
1546
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1547
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1548
    self.assertRaises(AssertionError, self.GL.release,
1549
                      locking.LEVEL_CLUSTER, ['BGL'])
1550
    self.assertRaises(AssertionError, self.GL.release,
1551
                      locking.LEVEL_CLUSTER)
1552
    self.GL.release(locking.LEVEL_NODE)
1553
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1554
    self.assertRaises(AssertionError, self.GL.release,
1555
                      locking.LEVEL_CLUSTER, ['BGL'])
1556
    self.assertRaises(AssertionError, self.GL.release,
1557
                      locking.LEVEL_CLUSTER)
1558
    self.GL.release(locking.LEVEL_INSTANCE)
1559
    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1560
    self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1561
    self.assertRaises(AssertionError, self.GL.release,
1562
                      locking.LEVEL_CLUSTER, ['BGL'])
1563
    self.assertRaises(AssertionError, self.GL.release,
1564
                      locking.LEVEL_CLUSTER)
1565
    self.GL.release(locking.LEVEL_NODEGROUP)
1566
    self.GL.release(locking.LEVEL_CLUSTER)
1567

    
1568
  def testWrongOrder(self):
1569
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1570
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1571
    self.assertRaises(AssertionError, self.GL.acquire,
1572
                      locking.LEVEL_NODE, ['n1'])
1573
    self.assertRaises(AssertionError, self.GL.acquire,
1574
                      locking.LEVEL_NODEGROUP, ['g1'])
1575
    self.assertRaises(AssertionError, self.GL.acquire,
1576
                      locking.LEVEL_INSTANCE, ['i2'])
1577

    
1578
  def testModifiableLevels(self):
1579
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1580
                      ['BGL2'])
1581
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1582
    self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1583
    self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1584
    self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1585
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1586
    self.GL.add(locking.LEVEL_NODE, ['n3'])
1587
    self.GL.remove(locking.LEVEL_NODE, ['n1'])
1588
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1589
    self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1590
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1591
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1592
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1593
    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1594
                      ['BGL2'])
1595

    
1596
  # Helper function to run as a thread that shared the BGL and then acquires
1597
  # some locks at another level.
1598
  def _doLock(self, level, names, shared):
1599
    try:
1600
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1601
      self.GL.acquire(level, names, shared=shared)
1602
      self.done.put('DONE')
1603
      self.GL.release(level)
1604
      self.GL.release(locking.LEVEL_CLUSTER)
1605
    except errors.LockError:
1606
      self.done.put('ERR')
1607

    
1608
  @_Repeat
1609
  def testConcurrency(self):
1610
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1611
    self._addThread(target=self._doLock,
1612
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1613
    self._waitThreads()
1614
    self.assertEqual(self.done.get_nowait(), 'DONE')
1615
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1616
    self._addThread(target=self._doLock,
1617
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1618
    self._waitThreads()
1619
    self.assertEqual(self.done.get_nowait(), 'DONE')
1620
    self._addThread(target=self._doLock,
1621
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1622
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1623
    self.GL.release(locking.LEVEL_INSTANCE)
1624
    self._waitThreads()
1625
    self.assertEqual(self.done.get_nowait(), 'DONE')
1626
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1627
    self._addThread(target=self._doLock,
1628
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1629
    self._waitThreads()
1630
    self.assertEqual(self.done.get_nowait(), 'DONE')
1631
    self._addThread(target=self._doLock,
1632
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1633
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1634
    self.GL.release(locking.LEVEL_INSTANCE)
1635
    self._waitThreads()
1636
    self.assertEqual(self.done.get(True, 1), 'DONE')
1637
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1638

    
1639

    
1640
class TestLockMonitor(_ThreadedTestCase):
1641
  def setUp(self):
1642
    _ThreadedTestCase.setUp(self)
1643
    self.lm = locking.LockMonitor()
1644

    
1645
  def testSingleThread(self):
1646
    locks = []
1647

    
1648
    for i in range(100):
1649
      name = "TestLock%s" % i
1650
      locks.append(locking.SharedLock(name, monitor=self.lm))
1651

    
1652
    self.assertEqual(len(self.lm._locks), len(locks))
1653
    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1654
    self.assertEqual(len(result.fields), 1)
1655
    self.assertEqual(len(result.data), 100)
1656

    
1657
    # Delete all locks
1658
    del locks[:]
1659

    
1660
    # The garbage collector might needs some time
1661
    def _CheckLocks():
1662
      if self.lm._locks:
1663
        raise utils.RetryAgain()
1664

    
1665
    utils.Retry(_CheckLocks, 0.1, 30.0)
1666

    
1667
    self.assertFalse(self.lm._locks)
1668

    
1669
  def testMultiThread(self):
1670
    locks = []
1671

    
1672
    def _CreateLock(prev, next, name):
1673
      prev.wait()
1674
      locks.append(locking.SharedLock(name, monitor=self.lm))
1675
      if next:
1676
        next.set()
1677

    
1678
    expnames = []
1679

    
1680
    first = threading.Event()
1681
    prev = first
1682

    
1683
    # Use a deterministic random generator
1684
    for i in random.Random(4263).sample(range(100), 33):
1685
      name = "MtTestLock%s" % i
1686
      expnames.append(name)
1687

    
1688
      ev = threading.Event()
1689
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1690
      prev = ev
1691

    
1692
    # Add locks
1693
    first.set()
1694
    self._waitThreads()
1695

    
1696
    # Check order in which locks were added
1697
    self.assertEqual([i.name for i in locks], expnames)
1698

    
1699
    # Check query result
1700
    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1701
    self.assert_(isinstance(result, dict))
1702
    response = objects.QueryResponse.FromDict(result)
1703
    self.assertEqual(response.data,
1704
                     [[(constants.QRFS_NORMAL, name),
1705
                       (constants.QRFS_NORMAL, None),
1706
                       (constants.QRFS_NORMAL, None),
1707
                       (constants.QRFS_NORMAL, [])]
1708
                      for name in utils.NiceSort(expnames)])
1709
    self.assertEqual(len(response.fields), 4)
1710
    self.assertEqual(["name", "mode", "owner", "pending"],
1711
                     [fdef.name for fdef in response.fields])
1712

    
1713
    # Test exclusive acquire
1714
    for tlock in locks[::4]:
1715
      tlock.acquire(shared=0)
1716
      try:
1717
        def _GetExpResult(name):
1718
          if tlock.name == name:
1719
            return [(constants.QRFS_NORMAL, name),
1720
                    (constants.QRFS_NORMAL, "exclusive"),
1721
                    (constants.QRFS_NORMAL,
1722
                     [threading.currentThread().getName()]),
1723
                    (constants.QRFS_NORMAL, [])]
1724
          return [(constants.QRFS_NORMAL, name),
1725
                  (constants.QRFS_NORMAL, None),
1726
                  (constants.QRFS_NORMAL, None),
1727
                  (constants.QRFS_NORMAL, [])]
1728

    
1729
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1730
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1731
                         [_GetExpResult(name)
1732
                          for name in utils.NiceSort(expnames)])
1733
      finally:
1734
        tlock.release()
1735

    
1736
    # Test shared acquire
1737
    def _Acquire(lock, shared, ev, notify):
1738
      lock.acquire(shared=shared)
1739
      try:
1740
        notify.set()
1741
        ev.wait()
1742
      finally:
1743
        lock.release()
1744

    
1745
    for tlock1 in locks[::11]:
1746
      for tlock2 in locks[::-15]:
1747
        if tlock2 == tlock1:
1748
          # Avoid deadlocks
1749
          continue
1750

    
1751
        for tlock3 in locks[::10]:
1752
          if tlock3 in (tlock2, tlock1):
1753
            # Avoid deadlocks
1754
            continue
1755

    
1756
          releaseev = threading.Event()
1757

    
1758
          # Acquire locks
1759
          acquireev = []
1760
          tthreads1 = []
1761
          for i in range(3):
1762
            ev = threading.Event()
1763
            tthreads1.append(self._addThread(target=_Acquire,
1764
                                             args=(tlock1, 1, releaseev, ev)))
1765
            acquireev.append(ev)
1766

    
1767
          ev = threading.Event()
1768
          tthread2 = self._addThread(target=_Acquire,
1769
                                     args=(tlock2, 1, releaseev, ev))
1770
          acquireev.append(ev)
1771

    
1772
          ev = threading.Event()
1773
          tthread3 = self._addThread(target=_Acquire,
1774
                                     args=(tlock3, 0, releaseev, ev))
1775
          acquireev.append(ev)
1776

    
1777
          # Wait for all locks to be acquired
1778
          for i in acquireev:
1779
            i.wait()
1780

    
1781
          # Check query result
1782
          result = self.lm.QueryLocks(["name", "mode", "owner"])
1783
          response = objects.QueryResponse.FromDict(result)
1784
          for (name, mode, owner) in response.data:
1785
            (name_status, name_value) = name
1786
            (owner_status, owner_value) = owner
1787

    
1788
            self.assertEqual(name_status, constants.QRFS_NORMAL)
1789
            self.assertEqual(owner_status, constants.QRFS_NORMAL)
1790

    
1791
            if name_value == tlock1.name:
1792
              self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
1793
              self.assertEqual(set(owner_value),
1794
                               set(i.getName() for i in tthreads1))
1795
              continue
1796

    
1797
            if name_value == tlock2.name:
1798
              self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
1799
              self.assertEqual(owner_value, [tthread2.getName()])
1800
              continue
1801

    
1802
            if name_value == tlock3.name:
1803
              self.assertEqual(mode, (constants.QRFS_NORMAL, "exclusive"))
1804
              self.assertEqual(owner_value, [tthread3.getName()])
1805
              continue
1806

    
1807
            self.assert_(name_value in expnames)
1808
            self.assertEqual(mode, (constants.QRFS_NORMAL, None))
1809
            self.assert_(owner_value is None)
1810

    
1811
          # Release locks again
1812
          releaseev.set()
1813

    
1814
          self._waitThreads()
1815

    
1816
          result = self.lm.QueryLocks(["name", "mode", "owner"])
1817
          self.assertEqual(objects.QueryResponse.FromDict(result).data,
1818
                           [[(constants.QRFS_NORMAL, name),
1819
                             (constants.QRFS_NORMAL, None),
1820
                             (constants.QRFS_NORMAL, None)]
1821
                            for name in utils.NiceSort(expnames)])
1822

    
1823
  def testDelete(self):
1824
    lock = locking.SharedLock("TestLock", monitor=self.lm)
1825

    
1826
    self.assertEqual(len(self.lm._locks), 1)
1827
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1828
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
1829
                     [[(constants.QRFS_NORMAL, lock.name),
1830
                       (constants.QRFS_NORMAL, None),
1831
                       (constants.QRFS_NORMAL, None)]])
1832

    
1833
    lock.delete()
1834

    
1835
    result = self.lm.QueryLocks(["name", "mode", "owner"])
1836
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
1837
                     [[(constants.QRFS_NORMAL, lock.name),
1838
                       (constants.QRFS_NORMAL, "deleted"),
1839
                       (constants.QRFS_NORMAL, None)]])
1840
    self.assertEqual(len(self.lm._locks), 1)
1841

    
1842
  def testPending(self):
1843
    def _Acquire(lock, shared, prev, next):
1844
      prev.wait()
1845

    
1846
      lock.acquire(shared=shared, test_notify=next.set)
1847
      try:
1848
        pass
1849
      finally:
1850
        lock.release()
1851

    
1852
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
1853

    
1854
    for shared in [0, 1]:
1855
      lock.acquire()
1856
      try:
1857
        self.assertEqual(len(self.lm._locks), 1)
1858
        result = self.lm.QueryLocks(["name", "mode", "owner"])
1859
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1860
                         [[(constants.QRFS_NORMAL, lock.name),
1861
                           (constants.QRFS_NORMAL, "exclusive"),
1862
                           (constants.QRFS_NORMAL,
1863
                            [threading.currentThread().getName()])]])
1864

    
1865
        threads = []
1866

    
1867
        first = threading.Event()
1868
        prev = first
1869

    
1870
        for i in range(5):
1871
          ev = threading.Event()
1872
          threads.append(self._addThread(target=_Acquire,
1873
                                          args=(lock, shared, prev, ev)))
1874
          prev = ev
1875

    
1876
        # Start acquires
1877
        first.set()
1878

    
1879
        # Wait for last acquire to start waiting
1880
        prev.wait()
1881

    
1882
        # NOTE: This works only because QueryLocks will acquire the
1883
        # lock-internal lock again and won't be able to get the information
1884
        # until it has the lock. By then the acquire should be registered in
1885
        # SharedLock.__pending (otherwise it's a bug).
1886

    
1887
        # All acquires are waiting now
1888
        if shared:
1889
          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
1890
        else:
1891
          pending = [("exclusive", [t.getName()]) for t in threads]
1892

    
1893
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1894
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1895
                         [[(constants.QRFS_NORMAL, lock.name),
1896
                           (constants.QRFS_NORMAL, "exclusive"),
1897
                           (constants.QRFS_NORMAL,
1898
                            [threading.currentThread().getName()]),
1899
                           (constants.QRFS_NORMAL, pending)]])
1900

    
1901
        self.assertEqual(len(self.lm._locks), 1)
1902
      finally:
1903
        lock.release()
1904

    
1905
      self._waitThreads()
1906

    
1907
      # No pending acquires
1908
      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1909
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
1910
                       [[(constants.QRFS_NORMAL, lock.name),
1911
                         (constants.QRFS_NORMAL, None),
1912
                         (constants.QRFS_NORMAL, None),
1913
                         (constants.QRFS_NORMAL, [])]])
1914

    
1915
      self.assertEqual(len(self.lm._locks), 1)
1916

    
1917

    
1918
if __name__ == '__main__':
1919
  testutils.GanetiTestProgram()