Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 8c114acd

History | View | Annotate | Download (83 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import gc
32
import itertools
33

    
34
from ganeti import constants
35
from ganeti import locking
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import compat
39
from ganeti import objects
40
from ganeti import query
41

    
42
import testutils
43

    
44

    
45
# This is used to test the ssynchronize decorator.
46
# Since it's passed as input to a decorator it must be declared as a global.
47
_decoratorlock = locking.SharedLock("decorator lock")
48

    
49
#: List for looping tests
50
ITERATIONS = range(8)
51

    
52

    
53
def _Repeat(fn):
54
  """Decorator for executing a function many times"""
55
  def wrapper(*args, **kwargs):
56
    for i in ITERATIONS:
57
      fn(*args, **kwargs)
58
  return wrapper
59

    
60

    
61
def SafeSleep(duration):
62
  start = time.time()
63
  while True:
64
    delay = start + duration - time.time()
65
    if delay <= 0.0:
66
      break
67
    time.sleep(delay)
68

    
69

    
70
class _ThreadedTestCase(unittest.TestCase):
71
  """Test class that supports adding/waiting on threads"""
72
  def setUp(self):
73
    unittest.TestCase.setUp(self)
74
    self.done = Queue.Queue(0)
75
    self.threads = []
76

    
77
  def _addThread(self, *args, **kwargs):
78
    """Create and remember a new thread"""
79
    t = threading.Thread(*args, **kwargs)
80
    self.threads.append(t)
81
    t.start()
82
    return t
83

    
84
  def _waitThreads(self):
85
    """Wait for all our threads to finish"""
86
    for t in self.threads:
87
      t.join(60)
88
      self.failIf(t.isAlive())
89
    self.threads = []
90

    
91

    
92
class _ConditionTestCase(_ThreadedTestCase):
93
  """Common test case for conditions"""
94

    
95
  def setUp(self, cls):
96
    _ThreadedTestCase.setUp(self)
97
    self.lock = threading.Lock()
98
    self.cond = cls(self.lock)
99

    
100
  def _testAcquireRelease(self):
101
    self.assertFalse(self.cond._is_owned())
102
    self.assertRaises(RuntimeError, self.cond.wait, None)
103
    self.assertRaises(RuntimeError, self.cond.notifyAll)
104

    
105
    self.cond.acquire()
106
    self.assert_(self.cond._is_owned())
107
    self.cond.notifyAll()
108
    self.assert_(self.cond._is_owned())
109
    self.cond.release()
110

    
111
    self.assertFalse(self.cond._is_owned())
112
    self.assertRaises(RuntimeError, self.cond.wait, None)
113
    self.assertRaises(RuntimeError, self.cond.notifyAll)
114

    
115
  def _testNotification(self):
116
    def _NotifyAll():
117
      self.done.put("NE")
118
      self.cond.acquire()
119
      self.done.put("NA")
120
      self.cond.notifyAll()
121
      self.done.put("NN")
122
      self.cond.release()
123

    
124
    self.cond.acquire()
125
    self._addThread(target=_NotifyAll)
126
    self.assertEqual(self.done.get(True, 1), "NE")
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.wait(None)
129
    self.assertEqual(self.done.get(True, 1), "NA")
130
    self.assertEqual(self.done.get(True, 1), "NN")
131
    self.assert_(self.cond._is_owned())
132
    self.cond.release()
133
    self.assertFalse(self.cond._is_owned())
134

    
135

    
136
class TestSingleNotifyPipeCondition(_ConditionTestCase):
137
  """SingleNotifyPipeCondition tests"""
138

    
139
  def setUp(self):
140
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141

    
142
  def testAcquireRelease(self):
143
    self._testAcquireRelease()
144

    
145
  def testNotification(self):
146
    self._testNotification()
147

    
148
  def testWaitReuse(self):
149
    self.cond.acquire()
150
    self.cond.wait(0)
151
    self.cond.wait(0.1)
152
    self.cond.release()
153

    
154
  def testNoNotifyReuse(self):
155
    self.cond.acquire()
156
    self.cond.notifyAll()
157
    self.assertRaises(RuntimeError, self.cond.wait, None)
158
    self.assertRaises(RuntimeError, self.cond.notifyAll)
159
    self.cond.release()
160

    
161

    
162
class TestPipeCondition(_ConditionTestCase):
163
  """PipeCondition tests"""
164

    
165
  def setUp(self):
166
    _ConditionTestCase.setUp(self, locking.PipeCondition)
167

    
168
  def testAcquireRelease(self):
169
    self._testAcquireRelease()
170

    
171
  def testNotification(self):
172
    self._testNotification()
173

    
174
  def _TestWait(self, fn):
175
    threads = [
176
      self._addThread(target=fn),
177
      self._addThread(target=fn),
178
      self._addThread(target=fn),
179
      ]
180

    
181
    # Wait for threads to be waiting
182
    for _ in threads:
183
      self.assertEqual(self.done.get(True, 1), "A")
184

    
185
    self.assertRaises(Queue.Empty, self.done.get_nowait)
186

    
187
    self.cond.acquire()
188
    self.assertEqual(len(self.cond._waiters), 3)
189
    self.assertEqual(self.cond._waiters, set(threads))
190
    # This new thread can't acquire the lock, and thus call wait, before we
191
    # release it
192
    self._addThread(target=fn)
193
    self.cond.notifyAll()
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195
    self.cond.release()
196

    
197
    # We should now get 3 W and 1 A (for the new thread) in whatever order
198
    w = 0
199
    a = 0
200
    for i in range(4):
201
      got = self.done.get(True, 1)
202
      if got == "W":
203
        w += 1
204
      elif got == "A":
205
        a += 1
206
      else:
207
        self.fail("Got %s on the done queue" % got)
208

    
209
    self.assertEqual(w, 3)
210
    self.assertEqual(a, 1)
211

    
212
    self.cond.acquire()
213
    self.cond.notifyAll()
214
    self.cond.release()
215
    self._waitThreads()
216
    self.assertEqual(self.done.get_nowait(), "W")
217
    self.assertRaises(Queue.Empty, self.done.get_nowait)
218

    
219
  def testBlockingWait(self):
220
    def _BlockingWait():
221
      self.cond.acquire()
222
      self.done.put("A")
223
      self.cond.wait(None)
224
      self.cond.release()
225
      self.done.put("W")
226

    
227
    self._TestWait(_BlockingWait)
228

    
229
  def testLongTimeoutWait(self):
230
    def _Helper():
231
      self.cond.acquire()
232
      self.done.put("A")
233
      self.cond.wait(15.0)
234
      self.cond.release()
235
      self.done.put("W")
236

    
237
    self._TestWait(_Helper)
238

    
239
  def _TimeoutWait(self, timeout, check):
240
    self.cond.acquire()
241
    self.cond.wait(timeout)
242
    self.cond.release()
243
    self.done.put(check)
244

    
245
  def testShortTimeoutWait(self):
246
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248
    self._waitThreads()
249
    self.assertEqual(self.done.get_nowait(), "T1")
250
    self.assertEqual(self.done.get_nowait(), "T1")
251
    self.assertRaises(Queue.Empty, self.done.get_nowait)
252

    
253
  def testZeroTimeoutWait(self):
254
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257
    self._waitThreads()
258
    self.assertEqual(self.done.get_nowait(), "T0")
259
    self.assertEqual(self.done.get_nowait(), "T0")
260
    self.assertEqual(self.done.get_nowait(), "T0")
261
    self.assertRaises(Queue.Empty, self.done.get_nowait)
262

    
263

    
264
class TestSharedLock(_ThreadedTestCase):
265
  """SharedLock tests"""
266

    
267
  def setUp(self):
268
    _ThreadedTestCase.setUp(self)
269
    self.sl = locking.SharedLock("TestSharedLock")
270

    
271
  def testSequenceAndOwnership(self):
272
    self.assertFalse(self.sl.is_owned())
273
    self.sl.acquire(shared=1)
274
    self.assert_(self.sl.is_owned())
275
    self.assert_(self.sl.is_owned(shared=1))
276
    self.assertFalse(self.sl.is_owned(shared=0))
277
    self.sl.release()
278
    self.assertFalse(self.sl.is_owned())
279
    self.sl.acquire()
280
    self.assert_(self.sl.is_owned())
281
    self.assertFalse(self.sl.is_owned(shared=1))
282
    self.assert_(self.sl.is_owned(shared=0))
283
    self.sl.release()
284
    self.assertFalse(self.sl.is_owned())
285
    self.sl.acquire(shared=1)
286
    self.assert_(self.sl.is_owned())
287
    self.assert_(self.sl.is_owned(shared=1))
288
    self.assertFalse(self.sl.is_owned(shared=0))
289
    self.sl.release()
290
    self.assertFalse(self.sl.is_owned())
291

    
292
  def testBooleanValue(self):
293
    # semaphores are supposed to return a true value on a successful acquire
294
    self.assert_(self.sl.acquire(shared=1))
295
    self.sl.release()
296
    self.assert_(self.sl.acquire())
297
    self.sl.release()
298

    
299
  def testDoubleLockingStoE(self):
300
    self.sl.acquire(shared=1)
301
    self.assertRaises(AssertionError, self.sl.acquire)
302

    
303
  def testDoubleLockingEtoS(self):
304
    self.sl.acquire()
305
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306

    
307
  def testDoubleLockingStoS(self):
308
    self.sl.acquire(shared=1)
309
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310

    
311
  def testDoubleLockingEtoE(self):
312
    self.sl.acquire()
313
    self.assertRaises(AssertionError, self.sl.acquire)
314

    
315
  # helper functions: called in a separate thread they acquire the lock, send
316
  # their identifier on the done queue, then release it.
317
  def _doItSharer(self):
318
    try:
319
      self.sl.acquire(shared=1)
320
      self.done.put("SHR")
321
      self.sl.release()
322
    except errors.LockError:
323
      self.done.put("ERR")
324

    
325
  def _doItExclusive(self):
326
    try:
327
      self.sl.acquire()
328
      self.done.put("EXC")
329
      self.sl.release()
330
    except errors.LockError:
331
      self.done.put("ERR")
332

    
333
  def _doItDelete(self):
334
    try:
335
      self.sl.delete()
336
      self.done.put("DEL")
337
    except errors.LockError:
338
      self.done.put("ERR")
339

    
340
  def testSharersCanCoexist(self):
341
    self.sl.acquire(shared=1)
342
    threading.Thread(target=self._doItSharer).start()
343
    self.assert_(self.done.get(True, 1))
344
    self.sl.release()
345

    
346
  @_Repeat
347
  def testExclusiveBlocksExclusive(self):
348
    self.sl.acquire()
349
    self._addThread(target=self._doItExclusive)
350
    self.assertRaises(Queue.Empty, self.done.get_nowait)
351
    self.sl.release()
352
    self._waitThreads()
353
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
354

    
355
  @_Repeat
356
  def testExclusiveBlocksDelete(self):
357
    self.sl.acquire()
358
    self._addThread(target=self._doItDelete)
359
    self.assertRaises(Queue.Empty, self.done.get_nowait)
360
    self.sl.release()
361
    self._waitThreads()
362
    self.failUnlessEqual(self.done.get_nowait(), "DEL")
363
    self.sl = locking.SharedLock(self.sl.name)
364

    
365
  @_Repeat
366
  def testExclusiveBlocksSharer(self):
367
    self.sl.acquire()
368
    self._addThread(target=self._doItSharer)
369
    self.assertRaises(Queue.Empty, self.done.get_nowait)
370
    self.sl.release()
371
    self._waitThreads()
372
    self.failUnlessEqual(self.done.get_nowait(), "SHR")
373

    
374
  @_Repeat
375
  def testSharerBlocksExclusive(self):
376
    self.sl.acquire(shared=1)
377
    self._addThread(target=self._doItExclusive)
378
    self.assertRaises(Queue.Empty, self.done.get_nowait)
379
    self.sl.release()
380
    self._waitThreads()
381
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
382

    
383
  @_Repeat
384
  def testSharerBlocksDelete(self):
385
    self.sl.acquire(shared=1)
386
    self._addThread(target=self._doItDelete)
387
    self.assertRaises(Queue.Empty, self.done.get_nowait)
388
    self.sl.release()
389
    self._waitThreads()
390
    self.failUnlessEqual(self.done.get_nowait(), "DEL")
391
    self.sl = locking.SharedLock(self.sl.name)
392

    
393
  @_Repeat
394
  def testWaitingExclusiveBlocksSharer(self):
395
    """SKIPPED testWaitingExclusiveBlockSharer"""
396
    return
397

    
398
    self.sl.acquire(shared=1)
399
    # the lock is acquired in shared mode...
400
    self._addThread(target=self._doItExclusive)
401
    # ...but now an exclusive is waiting...
402
    self._addThread(target=self._doItSharer)
403
    # ...so the sharer should be blocked as well
404
    self.assertRaises(Queue.Empty, self.done.get_nowait)
405
    self.sl.release()
406
    self._waitThreads()
407
    # The exclusive passed before
408
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
409
    self.failUnlessEqual(self.done.get_nowait(), "SHR")
410

    
411
  @_Repeat
412
  def testWaitingSharerBlocksExclusive(self):
413
    """SKIPPED testWaitingSharerBlocksExclusive"""
414
    return
415

    
416
    self.sl.acquire()
417
    # the lock is acquired in exclusive mode...
418
    self._addThread(target=self._doItSharer)
419
    # ...but now a sharer is waiting...
420
    self._addThread(target=self._doItExclusive)
421
    # ...the exclusive is waiting too...
422
    self.assertRaises(Queue.Empty, self.done.get_nowait)
423
    self.sl.release()
424
    self._waitThreads()
425
    # The sharer passed before
426
    self.assertEqual(self.done.get_nowait(), "SHR")
427
    self.assertEqual(self.done.get_nowait(), "EXC")
428

    
429
  def testDelete(self):
430
    self.sl.delete()
431
    self.assertRaises(errors.LockError, self.sl.acquire)
432
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433
    self.assertRaises(errors.LockError, self.sl.delete)
434

    
435
  def testDeleteTimeout(self):
436
    self.assertTrue(self.sl.delete(timeout=60))
437

    
438
  def testDeleteTimeoutFail(self):
439
    ready = threading.Event()
440
    finish = threading.Event()
441

    
442
    def fn():
443
      self.sl.acquire(shared=0)
444
      ready.set()
445

    
446
      finish.wait()
447
      self.sl.release()
448

    
449
    self._addThread(target=fn)
450
    ready.wait()
451

    
452
    # Test if deleting a lock owned in exclusive mode by another thread fails
453
    # to delete when a timeout is used
454
    self.assertFalse(self.sl.delete(timeout=0.02))
455

    
456
    finish.set()
457
    self._waitThreads()
458

    
459
    self.assertTrue(self.sl.delete())
460
    self.assertRaises(errors.LockError, self.sl.acquire)
461

    
462
  def testNoDeleteIfSharer(self):
463
    self.sl.acquire(shared=1)
464
    self.assertRaises(AssertionError, self.sl.delete)
465

    
466
  @_Repeat
467
  def testDeletePendingSharersExclusiveDelete(self):
468
    self.sl.acquire()
469
    self._addThread(target=self._doItSharer)
470
    self._addThread(target=self._doItSharer)
471
    self._addThread(target=self._doItExclusive)
472
    self._addThread(target=self._doItDelete)
473
    self.sl.delete()
474
    self._waitThreads()
475
    # The threads who were pending return ERR
476
    for _ in range(4):
477
      self.assertEqual(self.done.get_nowait(), "ERR")
478
    self.sl = locking.SharedLock(self.sl.name)
479

    
480
  @_Repeat
481
  def testDeletePendingDeleteExclusiveSharers(self):
482
    self.sl.acquire()
483
    self._addThread(target=self._doItDelete)
484
    self._addThread(target=self._doItExclusive)
485
    self._addThread(target=self._doItSharer)
486
    self._addThread(target=self._doItSharer)
487
    self.sl.delete()
488
    self._waitThreads()
489
    # The two threads who were pending return both ERR
490
    self.assertEqual(self.done.get_nowait(), "ERR")
491
    self.assertEqual(self.done.get_nowait(), "ERR")
492
    self.assertEqual(self.done.get_nowait(), "ERR")
493
    self.assertEqual(self.done.get_nowait(), "ERR")
494
    self.sl = locking.SharedLock(self.sl.name)
495

    
496
  @_Repeat
497
  def testExclusiveAcquireTimeout(self):
498
    for shared in [0, 1]:
499
      on_queue = threading.Event()
500
      release_exclusive = threading.Event()
501

    
502
      def _LockExclusive():
503
        self.sl.acquire(shared=0, test_notify=on_queue.set)
504
        self.done.put("A: start wait")
505
        release_exclusive.wait()
506
        self.done.put("A: end wait")
507
        self.sl.release()
508

    
509
      # Start thread to hold lock in exclusive mode
510
      self._addThread(target=_LockExclusive)
511

    
512
      # Wait for wait to begin
513
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
514

    
515
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
516
      # on the queue
517
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
518
                                      test_notify=release_exclusive.set))
519

    
520
      self.done.put("got 2nd")
521
      self.sl.release()
522

    
523
      self._waitThreads()
524

    
525
      self.assertEqual(self.done.get_nowait(), "A: end wait")
526
      self.assertEqual(self.done.get_nowait(), "got 2nd")
527
      self.assertRaises(Queue.Empty, self.done.get_nowait)
528

    
529
  @_Repeat
530
  def testAcquireExpiringTimeout(self):
531
    def _AcquireWithTimeout(shared, timeout):
532
      if not self.sl.acquire(shared=shared, timeout=timeout):
533
        self.done.put("timeout")
534

    
535
    for shared in [0, 1]:
536
      # Lock exclusively
537
      self.sl.acquire()
538

    
539
      # Start shared acquires with timeout between 0 and 20 ms
540
      for i in range(11):
541
        self._addThread(target=_AcquireWithTimeout,
542
                        args=(shared, i * 2.0 / 1000.0))
543

    
544
      # Wait for threads to finish (makes sure the acquire timeout expires
545
      # before releasing the lock)
546
      self._waitThreads()
547

    
548
      # Release lock
549
      self.sl.release()
550

    
551
      for _ in range(11):
552
        self.assertEqual(self.done.get_nowait(), "timeout")
553

    
554
      self.assertRaises(Queue.Empty, self.done.get_nowait)
555

    
556
  @_Repeat
557
  def testSharedSkipExclusiveAcquires(self):
558
    # Tests whether shared acquires jump in front of exclusive acquires in the
559
    # queue.
560

    
561
    def _Acquire(shared, name, notify_ev, wait_ev):
562
      if notify_ev:
563
        notify_fn = notify_ev.set
564
      else:
565
        notify_fn = None
566

    
567
      if wait_ev:
568
        wait_ev.wait()
569

    
570
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
571
        return
572

    
573
      self.done.put(name)
574
      self.sl.release()
575

    
576
    # Get exclusive lock while we fill the queue
577
    self.sl.acquire()
578

    
579
    shrcnt1 = 5
580
    shrcnt2 = 7
581
    shrcnt3 = 9
582
    shrcnt4 = 2
583

    
584
    # Add acquires using threading.Event for synchronization. They'll be
585
    # acquired exactly in the order defined in this list.
586
    acquires = (shrcnt1 * [(1, "shared 1")] +
587
                3 * [(0, "exclusive 1")] +
588
                shrcnt2 * [(1, "shared 2")] +
589
                shrcnt3 * [(1, "shared 3")] +
590
                shrcnt4 * [(1, "shared 4")] +
591
                3 * [(0, "exclusive 2")])
592

    
593
    ev_cur = None
594
    ev_prev = None
595

    
596
    for args in acquires:
597
      ev_cur = threading.Event()
598
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
599
      ev_prev = ev_cur
600

    
601
    # Wait for last acquire to start
602
    ev_prev.wait()
603

    
604
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
605
    # together
606
    self.assertEqual(self.sl._count_pending(), 7)
607

    
608
    # Release exclusive lock and wait
609
    self.sl.release()
610

    
611
    self._waitThreads()
612

    
613
    # Check sequence
614
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
615
      # Shared locks aren't guaranteed to be notified in order, but they'll be
616
      # first
617
      tmp = self.done.get_nowait()
618
      if tmp == "shared 1":
619
        shrcnt1 -= 1
620
      elif tmp == "shared 2":
621
        shrcnt2 -= 1
622
      elif tmp == "shared 3":
623
        shrcnt3 -= 1
624
      elif tmp == "shared 4":
625
        shrcnt4 -= 1
626
    self.assertEqual(shrcnt1, 0)
627
    self.assertEqual(shrcnt2, 0)
628
    self.assertEqual(shrcnt3, 0)
629
    self.assertEqual(shrcnt3, 0)
630

    
631
    for _ in range(3):
632
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
633

    
634
    for _ in range(3):
635
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
636

    
637
    self.assertRaises(Queue.Empty, self.done.get_nowait)
638

    
639
  def testIllegalDowngrade(self):
640
    # Not yet acquired
641
    self.assertRaises(AssertionError, self.sl.downgrade)
642

    
643
    # Acquire in shared mode, downgrade should be no-op
644
    self.assertTrue(self.sl.acquire(shared=1))
645
    self.assertTrue(self.sl.is_owned(shared=1))
646
    self.assertTrue(self.sl.downgrade())
647
    self.assertTrue(self.sl.is_owned(shared=1))
648
    self.sl.release()
649

    
650
  def testDowngrade(self):
651
    self.assertTrue(self.sl.acquire())
652
    self.assertTrue(self.sl.is_owned(shared=0))
653
    self.assertTrue(self.sl.downgrade())
654
    self.assertTrue(self.sl.is_owned(shared=1))
655
    self.sl.release()
656

    
657
  @_Repeat
658
  def testDowngradeJumpsAheadOfExclusive(self):
659
    def _KeepExclusive(ev_got, ev_downgrade, ev_release):
660
      self.assertTrue(self.sl.acquire())
661
      self.assertTrue(self.sl.is_owned(shared=0))
662
      ev_got.set()
663
      ev_downgrade.wait()
664
      self.assertTrue(self.sl.is_owned(shared=0))
665
      self.assertTrue(self.sl.downgrade())
666
      self.assertTrue(self.sl.is_owned(shared=1))
667
      ev_release.wait()
668
      self.assertTrue(self.sl.is_owned(shared=1))
669
      self.sl.release()
670

    
671
    def _KeepExclusive2(ev_started, ev_release):
672
      self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
673
      self.assertTrue(self.sl.is_owned(shared=0))
674
      ev_release.wait()
675
      self.assertTrue(self.sl.is_owned(shared=0))
676
      self.sl.release()
677

    
678
    def _KeepShared(ev_started, ev_got, ev_release):
679
      self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
680
      self.assertTrue(self.sl.is_owned(shared=1))
681
      ev_got.set()
682
      ev_release.wait()
683
      self.assertTrue(self.sl.is_owned(shared=1))
684
      self.sl.release()
685

    
686
    # Acquire lock in exclusive mode
687
    ev_got_excl1 = threading.Event()
688
    ev_downgrade_excl1 = threading.Event()
689
    ev_release_excl1 = threading.Event()
690
    th_excl1 = self._addThread(target=_KeepExclusive,
691
                               args=(ev_got_excl1, ev_downgrade_excl1,
692
                                     ev_release_excl1))
693
    ev_got_excl1.wait()
694

    
695
    # Start a second exclusive acquire
696
    ev_started_excl2 = threading.Event()
697
    ev_release_excl2 = threading.Event()
698
    th_excl2 = self._addThread(target=_KeepExclusive2,
699
                               args=(ev_started_excl2, ev_release_excl2))
700
    ev_started_excl2.wait()
701

    
702
    # Start shared acquires, will jump ahead of second exclusive acquire when
703
    # first exclusive acquire downgrades
704
    ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
705
    ev_release_shared = threading.Event()
706

    
707
    th_shared = [self._addThread(target=_KeepShared,
708
                                 args=(ev_started, ev_got, ev_release_shared))
709
                 for (ev_started, ev_got) in ev_shared]
710

    
711
    # Wait for all shared acquires to start
712
    for (ev, _) in ev_shared:
713
      ev.wait()
714

    
715
    # Check lock information
716
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
717
                     [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
718
    [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
719
    self.assertEqual([(pendmode, sorted(waiting))
720
                      for (pendmode, waiting) in pending],
721
                     [("exclusive", [th_excl2.getName()]),
722
                      ("shared", sorted(th.getName() for th in th_shared))])
723

    
724
    # Shared acquires won't start until the exclusive lock is downgraded
725
    ev_downgrade_excl1.set()
726

    
727
    # Wait for all shared acquires to be successful
728
    for (_, ev) in ev_shared:
729
      ev.wait()
730

    
731
    # Check lock information again
732
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
733
                                              query.LQ_PENDING])),
734
                     [(self.sl.name, "shared", None,
735
                       [("exclusive", [th_excl2.getName()])])])
736
    [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
737
    self.assertEqual(set(owner), set([th_excl1.getName()] +
738
                                     [th.getName() for th in th_shared]))
739

    
740
    ev_release_excl1.set()
741
    ev_release_excl2.set()
742
    ev_release_shared.set()
743

    
744
    self._waitThreads()
745

    
746
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
747
                                              query.LQ_PENDING])),
748
                     [(self.sl.name, None, None, [])])
749

    
750
  @_Repeat
751
  def testMixedAcquireTimeout(self):
752
    sync = threading.Event()
753

    
754
    def _AcquireShared(ev):
755
      if not self.sl.acquire(shared=1, timeout=None):
756
        return
757

    
758
      self.done.put("shared")
759

    
760
      # Notify main thread
761
      ev.set()
762

    
763
      # Wait for notification from main thread
764
      sync.wait()
765

    
766
      # Release lock
767
      self.sl.release()
768

    
769
    acquires = []
770
    for _ in range(3):
771
      ev = threading.Event()
772
      self._addThread(target=_AcquireShared, args=(ev, ))
773
      acquires.append(ev)
774

    
775
    # Wait for all acquires to finish
776
    for i in acquires:
777
      i.wait()
778

    
779
    self.assertEqual(self.sl._count_pending(), 0)
780

    
781
    # Try to get exclusive lock
782
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
783

    
784
    # Acquire exclusive without timeout
785
    exclsync = threading.Event()
786
    exclev = threading.Event()
787

    
788
    def _AcquireExclusive():
789
      if not self.sl.acquire(shared=0):
790
        return
791

    
792
      self.done.put("exclusive")
793

    
794
      # Notify main thread
795
      exclev.set()
796

    
797
      # Wait for notification from main thread
798
      exclsync.wait()
799

    
800
      self.sl.release()
801

    
802
    self._addThread(target=_AcquireExclusive)
803

    
804
    # Try to get exclusive lock
805
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
806

    
807
    # Make all shared holders release their locks
808
    sync.set()
809

    
810
    # Wait for exclusive acquire to succeed
811
    exclev.wait()
812

    
813
    self.assertEqual(self.sl._count_pending(), 0)
814

    
815
    # Try to get exclusive lock
816
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
817

    
818
    def _AcquireSharedSimple():
819
      if self.sl.acquire(shared=1, timeout=None):
820
        self.done.put("shared2")
821
        self.sl.release()
822

    
823
    for _ in range(10):
824
      self._addThread(target=_AcquireSharedSimple)
825

    
826
    # Tell exclusive lock to release
827
    exclsync.set()
828

    
829
    # Wait for everything to finish
830
    self._waitThreads()
831

    
832
    self.assertEqual(self.sl._count_pending(), 0)
833

    
834
    # Check sequence
835
    for _ in range(3):
836
      self.assertEqual(self.done.get_nowait(), "shared")
837

    
838
    self.assertEqual(self.done.get_nowait(), "exclusive")
839

    
840
    for _ in range(10):
841
      self.assertEqual(self.done.get_nowait(), "shared2")
842

    
843
    self.assertRaises(Queue.Empty, self.done.get_nowait)
844

    
845
  def testPriority(self):
846
    # Acquire in exclusive mode
847
    self.assert_(self.sl.acquire(shared=0))
848

    
849
    # Queue acquires
850
    def _Acquire(prev, next, shared, priority, result):
851
      prev.wait()
852
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
853
      try:
854
        self.done.put(result)
855
      finally:
856
        self.sl.release()
857

    
858
    counter = itertools.count(0)
859
    priorities = range(-20, 30)
860
    first = threading.Event()
861
    prev = first
862

    
863
    # Data structure:
864
    # {
865
    #   priority:
866
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
867
    #      (shared/exclusive, ...),
868
    #      ...,
869
    #     ],
870
    # }
871
    perprio = {}
872

    
873
    # References shared acquire per priority in L{perprio}. Data structure:
874
    # {
875
    #   priority: (shared=1, set(acquire names), set(pending threads)),
876
    # }
877
    prioshared = {}
878

    
879
    for seed in [4979, 9523, 14902, 32440]:
880
      # Use a deterministic random generator
881
      rnd = random.Random(seed)
882
      for priority in [rnd.choice(priorities) for _ in range(30)]:
883
        modes = [0, 1]
884
        rnd.shuffle(modes)
885
        for shared in modes:
886
          # Unique name
887
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
888

    
889
          ev = threading.Event()
890
          thread = self._addThread(target=_Acquire,
891
                                   args=(prev, ev, shared, priority, acqname))
892
          prev = ev
893

    
894
          # Record expected aqcuire, see above for structure
895
          data = (shared, set([acqname]), set([thread]))
896
          priolist = perprio.setdefault(priority, [])
897
          if shared:
898
            priosh = prioshared.get(priority, None)
899
            if priosh:
900
              # Shared acquires are merged
901
              for i, j in zip(priosh[1:], data[1:]):
902
                i.update(j)
903
              assert data[0] == priosh[0]
904
            else:
905
              prioshared[priority] = data
906
              priolist.append(data)
907
          else:
908
            priolist.append(data)
909

    
910
    # Start all acquires and wait for them
911
    first.set()
912
    prev.wait()
913

    
914
    # Check lock information
915
    self.assertEqual(self.sl.GetLockInfo(set()),
916
                     [(self.sl.name, None, None, None)])
917
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
918
                     [(self.sl.name, "exclusive",
919
                       [threading.currentThread().getName()], None)])
920

    
921
    self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
922
                            perprio)
923

    
924
    # Let threads acquire the lock
925
    self.sl.release()
926

    
927
    # Wait for everything to finish
928
    self._waitThreads()
929

    
930
    self.assert_(self.sl._check_empty())
931

    
932
    # Check acquires by priority
933
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
934
      for (_, names, _) in acquires:
935
        # For shared acquires, the set will contain 1..n entries. For exclusive
936
        # acquires only one.
937
        while names:
938
          names.remove(self.done.get_nowait())
939
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
940

    
941
    self.assertRaises(Queue.Empty, self.done.get_nowait)
942

    
943
  def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
944
    self.assertEqual(name, self.sl.name)
945
    self.assert_(mode is None)
946
    self.assert_(owner is None)
947

    
948
    self.assertEqual([(pendmode, sorted(waiting))
949
                      for (pendmode, waiting) in pending],
950
                     [(["exclusive", "shared"][int(bool(shared))],
951
                       sorted(t.getName() for t in threads))
952
                      for acquires in [perprio[i]
953
                                       for i in sorted(perprio.keys())]
954
                      for (shared, _, threads) in acquires])
955

    
956
  class _FakeTimeForSpuriousNotifications:
957
    def __init__(self, now, check_end):
958
      self.now = now
959
      self.check_end = check_end
960

    
961
      # Deterministic random number generator
962
      self.rnd = random.Random(15086)
963

    
964
    def time(self):
965
      # Advance time if the random number generator thinks so (this is to test
966
      # multiple notifications without advancing the time)
967
      if self.rnd.random() < 0.3:
968
        self.now += self.rnd.random()
969

    
970
      self.check_end(self.now)
971

    
972
      return self.now
973

    
974
  @_Repeat
975
  def testAcquireTimeoutWithSpuriousNotifications(self):
976
    ready = threading.Event()
977
    locked = threading.Event()
978
    req = Queue.Queue(0)
979

    
980
    epoch = 4000.0
981
    timeout = 60.0
982

    
983
    def check_end(now):
984
      self.assertFalse(locked.isSet())
985

    
986
      # If we waited long enough (in virtual time), tell main thread to release
987
      # lock, otherwise tell it to notify once more
988
      req.put(now < (epoch + (timeout * 0.8)))
989

    
990
    time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
991

    
992
    sl = locking.SharedLock("test", _time_fn=time_fn)
993

    
994
    # Acquire in exclusive mode
995
    sl.acquire(shared=0)
996

    
997
    def fn():
998
      self.assertTrue(sl.acquire(shared=0, timeout=timeout,
999
                                 test_notify=ready.set))
1000
      locked.set()
1001
      sl.release()
1002
      self.done.put("success")
1003

    
1004
    # Start acquire with timeout and wait for it to be ready
1005
    self._addThread(target=fn)
1006
    ready.wait()
1007

    
1008
    # The separate thread is now waiting to acquire the lock, so start sending
1009
    # spurious notifications.
1010

    
1011
    # Wait for separate thread to ask for another notification
1012
    count = 0
1013
    while req.get():
1014
      # After sending the notification, the lock will take a short amount of
1015
      # time to notice and to retrieve the current time
1016
      sl._notify_topmost()
1017
      count += 1
1018

    
1019
    self.assertTrue(count > 100, "Not enough notifications were sent")
1020

    
1021
    self.assertFalse(locked.isSet())
1022

    
1023
    # Some notifications have been sent, now actually release the lock
1024
    sl.release()
1025

    
1026
    # Wait for lock to be acquired
1027
    locked.wait()
1028

    
1029
    self._waitThreads()
1030

    
1031
    self.assertEqual(self.done.get_nowait(), "success")
1032
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1033

    
1034

    
1035
class TestSharedLockInCondition(_ThreadedTestCase):
1036
  """SharedLock as a condition lock tests"""
1037

    
1038
  def setUp(self):
1039
    _ThreadedTestCase.setUp(self)
1040
    self.sl = locking.SharedLock("TestSharedLockInCondition")
1041
    self.setCondition()
1042

    
1043
  def setCondition(self):
1044
    self.cond = threading.Condition(self.sl)
1045

    
1046
  def testKeepMode(self):
1047
    self.cond.acquire(shared=1)
1048
    self.assert_(self.sl.is_owned(shared=1))
1049
    self.cond.wait(0)
1050
    self.assert_(self.sl.is_owned(shared=1))
1051
    self.cond.release()
1052
    self.cond.acquire(shared=0)
1053
    self.assert_(self.sl.is_owned(shared=0))
1054
    self.cond.wait(0)
1055
    self.assert_(self.sl.is_owned(shared=0))
1056
    self.cond.release()
1057

    
1058

    
1059
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
1060
  """SharedLock as a pipe condition lock tests"""
1061

    
1062
  def setCondition(self):
1063
    self.cond = locking.PipeCondition(self.sl)
1064

    
1065

    
1066
class TestSSynchronizedDecorator(_ThreadedTestCase):
1067
  """Shared Lock Synchronized decorator test"""
1068

    
1069
  def setUp(self):
1070
    _ThreadedTestCase.setUp(self)
1071

    
1072
  @locking.ssynchronized(_decoratorlock)
1073
  def _doItExclusive(self):
1074
    self.assert_(_decoratorlock.is_owned())
1075
    self.done.put("EXC")
1076

    
1077
  @locking.ssynchronized(_decoratorlock, shared=1)
1078
  def _doItSharer(self):
1079
    self.assert_(_decoratorlock.is_owned(shared=1))
1080
    self.done.put("SHR")
1081

    
1082
  def testDecoratedFunctions(self):
1083
    self._doItExclusive()
1084
    self.assertFalse(_decoratorlock.is_owned())
1085
    self._doItSharer()
1086
    self.assertFalse(_decoratorlock.is_owned())
1087

    
1088
  def testSharersCanCoexist(self):
1089
    _decoratorlock.acquire(shared=1)
1090
    threading.Thread(target=self._doItSharer).start()
1091
    self.assert_(self.done.get(True, 1))
1092
    _decoratorlock.release()
1093

    
1094
  @_Repeat
1095
  def testExclusiveBlocksExclusive(self):
1096
    _decoratorlock.acquire()
1097
    self._addThread(target=self._doItExclusive)
1098
    # give it a bit of time to check that it's not actually doing anything
1099
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1100
    _decoratorlock.release()
1101
    self._waitThreads()
1102
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
1103

    
1104
  @_Repeat
1105
  def testExclusiveBlocksSharer(self):
1106
    _decoratorlock.acquire()
1107
    self._addThread(target=self._doItSharer)
1108
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1109
    _decoratorlock.release()
1110
    self._waitThreads()
1111
    self.failUnlessEqual(self.done.get_nowait(), "SHR")
1112

    
1113
  @_Repeat
1114
  def testSharerBlocksExclusive(self):
1115
    _decoratorlock.acquire(shared=1)
1116
    self._addThread(target=self._doItExclusive)
1117
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1118
    _decoratorlock.release()
1119
    self._waitThreads()
1120
    self.failUnlessEqual(self.done.get_nowait(), "EXC")
1121

    
1122

    
1123
class TestLockSet(_ThreadedTestCase):
1124
  """LockSet tests"""
1125

    
1126
  def setUp(self):
1127
    _ThreadedTestCase.setUp(self)
1128
    self._setUpLS()
1129

    
1130
  def _setUpLS(self):
1131
    """Helper to (re)initialize the lock set"""
1132
    self.resources = ["one", "two", "three"]
1133
    self.ls = locking.LockSet(self.resources, "TestLockSet")
1134

    
1135
  def testResources(self):
1136
    self.assertEquals(self.ls._names(), set(self.resources))
1137
    newls = locking.LockSet([], "TestLockSet.testResources")
1138
    self.assertEquals(newls._names(), set())
1139

    
1140
  def testCheckOwnedUnknown(self):
1141
    self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1142
    for shared in [-1, 0, 1, 6378, 24255]:
1143
      self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1144
                                           shared=shared))
1145

    
1146
  def testCheckOwnedUnknownWhileHolding(self):
1147
    self.assertFalse(self.ls.check_owned([]))
1148
    self.ls.acquire("one", shared=1)
1149
    self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1150
    self.assertTrue(self.ls.check_owned("one", shared=1))
1151
    self.assertFalse(self.ls.check_owned("one", shared=0))
1152
    self.assertFalse(self.ls.check_owned(["one", "two"]))
1153
    self.assertRaises(errors.LockError, self.ls.check_owned,
1154
                      ["one", "nonexist"])
1155
    self.assertRaises(errors.LockError, self.ls.check_owned, "")
1156
    self.ls.release()
1157
    self.assertFalse(self.ls.check_owned([]))
1158
    self.assertFalse(self.ls.check_owned("one"))
1159

    
1160
  def testAcquireRelease(self):
1161
    self.assertFalse(self.ls.check_owned(self.ls._names()))
1162
    self.assert_(self.ls.acquire("one"))
1163
    self.assertEquals(self.ls.list_owned(), set(["one"]))
1164
    self.assertTrue(self.ls.check_owned("one"))
1165
    self.assertTrue(self.ls.check_owned("one", shared=0))
1166
    self.assertFalse(self.ls.check_owned("one", shared=1))
1167
    self.ls.release()
1168
    self.assertEquals(self.ls.list_owned(), set())
1169
    self.assertFalse(self.ls.check_owned(self.ls._names()))
1170
    self.assertEquals(self.ls.acquire(["one"]), set(["one"]))
1171
    self.assertEquals(self.ls.list_owned(), set(["one"]))
1172
    self.ls.release()
1173
    self.assertEquals(self.ls.list_owned(), set())
1174
    self.ls.acquire(["one", "two", "three"])
1175
    self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1176
    self.assertTrue(self.ls.check_owned(self.ls._names()))
1177
    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1178
    self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1179
    self.ls.release("one")
1180
    self.assertFalse(self.ls.check_owned(["one"]))
1181
    self.assertTrue(self.ls.check_owned(["two", "three"]))
1182
    self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1183
    self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1184
    self.assertEquals(self.ls.list_owned(), set(["two", "three"]))
1185
    self.ls.release(["three"])
1186
    self.assertEquals(self.ls.list_owned(), set(["two"]))
1187
    self.ls.release()
1188
    self.assertEquals(self.ls.list_owned(), set())
1189
    self.assertEquals(self.ls.acquire(["one", "three"]), set(["one", "three"]))
1190
    self.assertEquals(self.ls.list_owned(), set(["one", "three"]))
1191
    self.ls.release()
1192
    self.assertEquals(self.ls.list_owned(), set())
1193
    for name in self.ls._names():
1194
      self.assertFalse(self.ls.check_owned(name))
1195

    
1196
  def testNoDoubleAcquire(self):
1197
    self.ls.acquire("one")
1198
    self.assertRaises(AssertionError, self.ls.acquire, "one")
1199
    self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1200
    self.assertRaises(AssertionError, self.ls.acquire, ["two", "three"])
1201
    self.ls.release()
1202
    self.ls.acquire(["one", "three"])
1203
    self.ls.release("one")
1204
    self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1205
    self.ls.release("three")
1206

    
1207
  def testNoWrongRelease(self):
1208
    self.assertRaises(AssertionError, self.ls.release)
1209
    self.ls.acquire("one")
1210
    self.assertRaises(AssertionError, self.ls.release, "two")
1211

    
1212
  def testAddRemove(self):
1213
    self.ls.add("four")
1214
    self.assertEquals(self.ls.list_owned(), set())
1215
    self.assert_("four" in self.ls._names())
1216
    self.ls.add(["five", "six", "seven"], acquired=1)
1217
    self.assert_("five" in self.ls._names())
1218
    self.assert_("six" in self.ls._names())
1219
    self.assert_("seven" in self.ls._names())
1220
    self.assertEquals(self.ls.list_owned(), set(["five", "six", "seven"]))
1221
    self.assertEquals(self.ls.remove(["five", "six"]), ["five", "six"])
1222
    self.assert_("five" not in self.ls._names())
1223
    self.assert_("six" not in self.ls._names())
1224
    self.assertEquals(self.ls.list_owned(), set(["seven"]))
1225
    self.assertRaises(AssertionError, self.ls.add, "eight", acquired=1)
1226
    self.ls.remove("seven")
1227
    self.assert_("seven" not in self.ls._names())
1228
    self.assertEquals(self.ls.list_owned(), set([]))
1229
    self.ls.acquire(None, shared=1)
1230
    self.assertRaises(AssertionError, self.ls.add, "eight")
1231
    self.ls.release()
1232
    self.ls.acquire(None)
1233
    self.ls.add("eight", acquired=1)
1234
    self.assert_("eight" in self.ls._names())
1235
    self.assert_("eight" in self.ls.list_owned())
1236
    self.ls.add("nine")
1237
    self.assert_("nine" in self.ls._names())
1238
    self.assert_("nine" not in self.ls.list_owned())
1239
    self.ls.release()
1240
    self.ls.remove(["two"])
1241
    self.assert_("two" not in self.ls._names())
1242
    self.ls.acquire("three")
1243
    self.assertEquals(self.ls.remove(["three"]), ["three"])
1244
    self.assert_("three" not in self.ls._names())
1245
    self.assertEquals(self.ls.remove("three"), [])
1246
    self.assertEquals(self.ls.remove(["one", "three", "six"]), ["one"])
1247
    self.assert_("one" not in self.ls._names())
1248

    
1249
  def testRemoveNonBlocking(self):
1250
    self.ls.acquire("one")
1251
    self.assertEquals(self.ls.remove("one"), ["one"])
1252
    self.ls.acquire(["two", "three"])
1253
    self.assertEquals(self.ls.remove(["two", "three"]),
1254
                      ["two", "three"])
1255

    
1256
  def testNoDoubleAdd(self):
1257
    self.assertRaises(errors.LockError, self.ls.add, "two")
1258
    self.ls.add("four")
1259
    self.assertRaises(errors.LockError, self.ls.add, "four")
1260

    
1261
  def testNoWrongRemoves(self):
1262
    self.ls.acquire(["one", "three"], shared=1)
1263
    # Cannot remove "two" while holding something which is not a superset
1264
    self.assertRaises(AssertionError, self.ls.remove, "two")
1265
    # Cannot remove "three" as we are sharing it
1266
    self.assertRaises(AssertionError, self.ls.remove, "three")
1267

    
1268
  def testAcquireSetLock(self):
1269
    # acquire the set-lock exclusively
1270
    self.assertEquals(self.ls.acquire(None), set(["one", "two", "three"]))
1271
    self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1272
    self.assertEquals(self.ls.is_owned(), True)
1273
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1274
    # I can still add/remove elements...
1275
    self.assertEquals(self.ls.remove(["two", "three"]), ["two", "three"])
1276
    self.assert_(self.ls.add("six"))
1277
    self.ls.release()
1278
    # share the set-lock
1279
    self.assertEquals(self.ls.acquire(None, shared=1), set(["one", "six"]))
1280
    # adding new elements is not possible
1281
    self.assertRaises(AssertionError, self.ls.add, "five")
1282
    self.ls.release()
1283

    
1284
  def testAcquireWithRepetitions(self):
1285
    self.assertEquals(self.ls.acquire(["two", "two", "three"], shared=1),
1286
                      set(["two", "two", "three"]))
1287
    self.ls.release(["two", "two"])
1288
    self.assertEquals(self.ls.list_owned(), set(["three"]))
1289

    
1290
  def testEmptyAcquire(self):
1291
    # Acquire an empty list of locks...
1292
    self.assertEquals(self.ls.acquire([]), set())
1293
    self.assertEquals(self.ls.list_owned(), set())
1294
    # New locks can still be addded
1295
    self.assert_(self.ls.add("six"))
1296
    # "re-acquiring" is not an issue, since we had really acquired nothing
1297
    self.assertEquals(self.ls.acquire([], shared=1), set())
1298
    self.assertEquals(self.ls.list_owned(), set())
1299
    # We haven't really acquired anything, so we cannot release
1300
    self.assertRaises(AssertionError, self.ls.release)
1301

    
1302
  def _doLockSet(self, names, shared):
1303
    try:
1304
      self.ls.acquire(names, shared=shared)
1305
      self.done.put("DONE")
1306
      self.ls.release()
1307
    except errors.LockError:
1308
      self.done.put("ERR")
1309

    
1310
  def _doAddSet(self, names):
1311
    try:
1312
      self.ls.add(names, acquired=1)
1313
      self.done.put("DONE")
1314
      self.ls.release()
1315
    except errors.LockError:
1316
      self.done.put("ERR")
1317

    
1318
  def _doRemoveSet(self, names):
1319
    self.done.put(self.ls.remove(names))
1320

    
1321
  @_Repeat
1322
  def testConcurrentSharedAcquire(self):
1323
    self.ls.acquire(["one", "two"], shared=1)
1324
    self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1325
    self._waitThreads()
1326
    self.assertEqual(self.done.get_nowait(), "DONE")
1327
    self._addThread(target=self._doLockSet, args=(["one", "two", "three"], 1))
1328
    self._waitThreads()
1329
    self.assertEqual(self.done.get_nowait(), "DONE")
1330
    self._addThread(target=self._doLockSet, args=("three", 1))
1331
    self._waitThreads()
1332
    self.assertEqual(self.done.get_nowait(), "DONE")
1333
    self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1334
    self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1335
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1336
    self.ls.release()
1337
    self._waitThreads()
1338
    self.assertEqual(self.done.get_nowait(), "DONE")
1339
    self.assertEqual(self.done.get_nowait(), "DONE")
1340

    
1341
  @_Repeat
1342
  def testConcurrentExclusiveAcquire(self):
1343
    self.ls.acquire(["one", "two"])
1344
    self._addThread(target=self._doLockSet, args=("three", 1))
1345
    self._waitThreads()
1346
    self.assertEqual(self.done.get_nowait(), "DONE")
1347
    self._addThread(target=self._doLockSet, args=("three", 0))
1348
    self._waitThreads()
1349
    self.assertEqual(self.done.get_nowait(), "DONE")
1350
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1351
    self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1352
    self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1353
    self._addThread(target=self._doLockSet, args=("one", 0))
1354
    self._addThread(target=self._doLockSet, args=("one", 1))
1355
    self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1356
    self._addThread(target=self._doLockSet, args=(["two", "three"], 1))
1357
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1358
    self.ls.release()
1359
    self._waitThreads()
1360
    for _ in range(6):
1361
      self.failUnlessEqual(self.done.get_nowait(), "DONE")
1362

    
1363
  @_Repeat
1364
  def testSimpleAcquireTimeoutExpiring(self):
1365
    names = sorted(self.ls._names())
1366
    self.assert_(len(names) >= 3)
1367

    
1368
    # Get name of first lock
1369
    first = names[0]
1370

    
1371
    # Get name of last lock
1372
    last = names.pop()
1373

    
1374
    checks = [
1375
      # Block first and try to lock it again
1376
      (first, first),
1377

    
1378
      # Block last and try to lock all locks
1379
      (None, first),
1380

    
1381
      # Block last and try to lock it again
1382
      (last, last),
1383
      ]
1384

    
1385
    for (wanted, block) in checks:
1386
      # Lock in exclusive mode
1387
      self.assert_(self.ls.acquire(block, shared=0))
1388

    
1389
      def _AcquireOne():
1390
        # Try to get the same lock again with a timeout (should never succeed)
1391
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1392
        if acquired:
1393
          self.done.put("acquired")
1394
          self.ls.release()
1395
        else:
1396
          self.assert_(acquired is None)
1397
          self.assertFalse(self.ls.list_owned())
1398
          self.assertFalse(self.ls.is_owned())
1399
          self.done.put("not acquired")
1400

    
1401
      self._addThread(target=_AcquireOne)
1402

    
1403
      # Wait for timeout in thread to expire
1404
      self._waitThreads()
1405

    
1406
      # Release exclusive lock again
1407
      self.ls.release()
1408

    
1409
      self.assertEqual(self.done.get_nowait(), "not acquired")
1410
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1411

    
1412
  @_Repeat
1413
  def testDelayedAndExpiringLockAcquire(self):
1414
    self._setUpLS()
1415
    self.ls.add(["five", "six", "seven", "eight", "nine"])
1416

    
1417
    for expire in (False, True):
1418
      names = sorted(self.ls._names())
1419
      self.assertEqual(len(names), 8)
1420

    
1421
      lock_ev = dict([(i, threading.Event()) for i in names])
1422

    
1423
      # Lock all in exclusive mode
1424
      self.assert_(self.ls.acquire(names, shared=0))
1425

    
1426
      if expire:
1427
        # We'll wait at least 300ms per lock
1428
        lockwait = len(names) * [0.3]
1429

    
1430
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1431
        # this gives us up to 2.4s to fail.
1432
        lockall_timeout = 0.4
1433
      else:
1434
        # This should finish rather quickly
1435
        lockwait = None
1436
        lockall_timeout = len(names) * 5.0
1437

    
1438
      def _LockAll():
1439
        def acquire_notification(name):
1440
          if not expire:
1441
            self.done.put("getting %s" % name)
1442

    
1443
          # Kick next lock
1444
          lock_ev[name].set()
1445

    
1446
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1447
                           test_notify=acquire_notification):
1448
          self.done.put("got all")
1449
          self.ls.release()
1450
        else:
1451
          self.done.put("timeout on all")
1452

    
1453
        # Notify all locks
1454
        for ev in lock_ev.values():
1455
          ev.set()
1456

    
1457
      t = self._addThread(target=_LockAll)
1458

    
1459
      for idx, name in enumerate(names):
1460
        # Wait for actual acquire on this lock to start
1461
        lock_ev[name].wait(10.0)
1462

    
1463
        if expire and t.isAlive():
1464
          # Wait some time after getting the notification to make sure the lock
1465
          # acquire will expire
1466
          SafeSleep(lockwait[idx])
1467

    
1468
        self.ls.release(names=name)
1469

    
1470
      self.assertFalse(self.ls.list_owned())
1471

    
1472
      self._waitThreads()
1473

    
1474
      if expire:
1475
        # Not checking which locks were actually acquired. Doing so would be
1476
        # too timing-dependant.
1477
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1478
      else:
1479
        for i in names:
1480
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1481
        self.assertEqual(self.done.get_nowait(), "got all")
1482
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1483

    
1484
  @_Repeat
1485
  def testConcurrentRemove(self):
1486
    self.ls.add("four")
1487
    self.ls.acquire(["one", "two", "four"])
1488
    self._addThread(target=self._doLockSet, args=(["one", "four"], 0))
1489
    self._addThread(target=self._doLockSet, args=(["one", "four"], 1))
1490
    self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1491
    self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1492
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1493
    self.ls.remove("one")
1494
    self.ls.release()
1495
    self._waitThreads()
1496
    for i in range(4):
1497
      self.failUnlessEqual(self.done.get_nowait(), "ERR")
1498
    self.ls.add(["five", "six"], acquired=1)
1499
    self._addThread(target=self._doLockSet, args=(["three", "six"], 1))
1500
    self._addThread(target=self._doLockSet, args=(["three", "six"], 0))
1501
    self._addThread(target=self._doLockSet, args=(["four", "six"], 1))
1502
    self._addThread(target=self._doLockSet, args=(["four", "six"], 0))
1503
    self.ls.remove("five")
1504
    self.ls.release()
1505
    self._waitThreads()
1506
    for i in range(4):
1507
      self.failUnlessEqual(self.done.get_nowait(), "DONE")
1508
    self.ls.acquire(["three", "four"])
1509
    self._addThread(target=self._doRemoveSet, args=(["four", "six"], ))
1510
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1511
    self.ls.remove("four")
1512
    self._waitThreads()
1513
    self.assertEqual(self.done.get_nowait(), ["six"])
1514
    self._addThread(target=self._doRemoveSet, args=(["two"]))
1515
    self._waitThreads()
1516
    self.assertEqual(self.done.get_nowait(), ["two"])
1517
    self.ls.release()
1518
    # reset lockset
1519
    self._setUpLS()
1520

    
1521
  @_Repeat
1522
  def testConcurrentSharedSetLock(self):
1523
    # share the set-lock...
1524
    self.ls.acquire(None, shared=1)
1525
    # ...another thread can share it too
1526
    self._addThread(target=self._doLockSet, args=(None, 1))
1527
    self._waitThreads()
1528
    self.assertEqual(self.done.get_nowait(), "DONE")
1529
    # ...or just share some elements
1530
    self._addThread(target=self._doLockSet, args=(["one", "three"], 1))
1531
    self._waitThreads()
1532
    self.assertEqual(self.done.get_nowait(), "DONE")
1533
    # ...but not add new ones or remove any
1534
    t = self._addThread(target=self._doAddSet, args=(["nine"]))
1535
    self._addThread(target=self._doRemoveSet, args=(["two"], ))
1536
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1537
    # this just releases the set-lock
1538
    self.ls.release([])
1539
    t.join(60)
1540
    self.assertEqual(self.done.get_nowait(), "DONE")
1541
    # release the lock on the actual elements so remove() can proceed too
1542
    self.ls.release()
1543
    self._waitThreads()
1544
    self.failUnlessEqual(self.done.get_nowait(), ["two"])
1545
    # reset lockset
1546
    self._setUpLS()
1547

    
1548
  @_Repeat
1549
  def testConcurrentExclusiveSetLock(self):
1550
    # acquire the set-lock...
1551
    self.ls.acquire(None, shared=0)
1552
    # ...no one can do anything else
1553
    self._addThread(target=self._doLockSet, args=(None, 1))
1554
    self._addThread(target=self._doLockSet, args=(None, 0))
1555
    self._addThread(target=self._doLockSet, args=(["three"], 0))
1556
    self._addThread(target=self._doLockSet, args=(["two"], 1))
1557
    self._addThread(target=self._doAddSet, args=(["nine"]))
1558
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1559
    self.ls.release()
1560
    self._waitThreads()
1561
    for _ in range(5):
1562
      self.assertEqual(self.done.get(True, 1), "DONE")
1563
    # cleanup
1564
    self._setUpLS()
1565

    
1566
  @_Repeat
1567
  def testConcurrentSetLockAdd(self):
1568
    self.ls.acquire("one")
1569
    # Another thread wants the whole SetLock
1570
    self._addThread(target=self._doLockSet, args=(None, 0))
1571
    self._addThread(target=self._doLockSet, args=(None, 1))
1572
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1573
    self.assertRaises(AssertionError, self.ls.add, "four")
1574
    self.ls.release()
1575
    self._waitThreads()
1576
    self.assertEqual(self.done.get_nowait(), "DONE")
1577
    self.assertEqual(self.done.get_nowait(), "DONE")
1578
    self.ls.acquire(None)
1579
    self._addThread(target=self._doLockSet, args=(None, 0))
1580
    self._addThread(target=self._doLockSet, args=(None, 1))
1581
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1582
    self.ls.add("four")
1583
    self.ls.add("five", acquired=1)
1584
    self.ls.add("six", acquired=1, shared=1)
1585
    self.assertEquals(self.ls.list_owned(),
1586
      set(["one", "two", "three", "five", "six"]))
1587
    self.assertEquals(self.ls.is_owned(), True)
1588
    self.assertEquals(self.ls._names(),
1589
      set(["one", "two", "three", "four", "five", "six"]))
1590
    self.ls.release()
1591
    self._waitThreads()
1592
    self.assertEqual(self.done.get_nowait(), "DONE")
1593
    self.assertEqual(self.done.get_nowait(), "DONE")
1594
    self._setUpLS()
1595

    
1596
  @_Repeat
1597
  def testEmptyLockSet(self):
1598
    # get the set-lock
1599
    self.assertEqual(self.ls.acquire(None), set(["one", "two", "three"]))
1600
    # now empty it...
1601
    self.ls.remove(["one", "two", "three"])
1602
    # and adds/locks by another thread still wait
1603
    self._addThread(target=self._doAddSet, args=(["nine"]))
1604
    self._addThread(target=self._doLockSet, args=(None, 1))
1605
    self._addThread(target=self._doLockSet, args=(None, 0))
1606
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1607
    self.ls.release()
1608
    self._waitThreads()
1609
    for _ in range(3):
1610
      self.assertEqual(self.done.get_nowait(), "DONE")
1611
    # empty it again...
1612
    self.assertEqual(self.ls.remove(["nine"]), ["nine"])
1613
    # now share it...
1614
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1615
    # other sharers can go, adds still wait
1616
    self._addThread(target=self._doLockSet, args=(None, 1))
1617
    self._waitThreads()
1618
    self.assertEqual(self.done.get_nowait(), "DONE")
1619
    self._addThread(target=self._doAddSet, args=(["nine"]))
1620
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1621
    self.ls.release()
1622
    self._waitThreads()
1623
    self.assertEqual(self.done.get_nowait(), "DONE")
1624
    self._setUpLS()
1625

    
1626
  def testAcquireWithNamesDowngrade(self):
1627
    self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1628
    self.assertTrue(self.ls.is_owned())
1629
    self.assertFalse(self.ls._get_lock().is_owned())
1630
    self.ls.release()
1631
    self.assertFalse(self.ls.is_owned())
1632
    self.assertFalse(self.ls._get_lock().is_owned())
1633
    # Can't downgrade after releasing
1634
    self.assertRaises(AssertionError, self.ls.downgrade, "two")
1635

    
1636
  def testDowngrade(self):
1637
    # Not owning anything, must raise an exception
1638
    self.assertFalse(self.ls.is_owned())
1639
    self.assertRaises(AssertionError, self.ls.downgrade)
1640

    
1641
    self.assertFalse(compat.any(i.is_owned()
1642
                                for i in self.ls._get_lockdict().values()))
1643
    self.assertFalse(self.ls.check_owned(self.ls._names()))
1644
    for name in self.ls._names():
1645
      self.assertFalse(self.ls.check_owned(name))
1646

    
1647
    self.assertEquals(self.ls.acquire(None, shared=0),
1648
                      set(["one", "two", "three"]))
1649
    self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1650

    
1651
    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1652
    for name in self.ls._names():
1653
      self.assertTrue(self.ls.check_owned(name))
1654
      self.assertTrue(self.ls.check_owned(name, shared=0))
1655
      self.assertFalse(self.ls.check_owned(name, shared=1))
1656

    
1657
    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1658
    self.assertTrue(compat.all(i.is_owned(shared=0)
1659
                               for i in self.ls._get_lockdict().values()))
1660

    
1661
    # Start downgrading locks
1662
    self.assertTrue(self.ls.downgrade(names=["one"]))
1663
    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1664
    self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1665
                               for name, lock in
1666
                                 self.ls._get_lockdict().items()))
1667

    
1668
    self.assertFalse(self.ls.check_owned("one", shared=0))
1669
    self.assertTrue(self.ls.check_owned("one", shared=1))
1670
    self.assertTrue(self.ls.check_owned("two", shared=0))
1671
    self.assertTrue(self.ls.check_owned("three", shared=0))
1672

    
1673
    # Downgrade second lock
1674
    self.assertTrue(self.ls.downgrade(names="two"))
1675
    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1676
    should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1677
    self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1678
                               for name, lock in
1679
                                 self.ls._get_lockdict().items()))
1680

    
1681
    self.assertFalse(self.ls.check_owned("one", shared=0))
1682
    self.assertTrue(self.ls.check_owned("one", shared=1))
1683
    self.assertFalse(self.ls.check_owned("two", shared=0))
1684
    self.assertTrue(self.ls.check_owned("two", shared=1))
1685
    self.assertTrue(self.ls.check_owned("three", shared=0))
1686

    
1687
    # Downgrading the last exclusive lock to shared must downgrade the
1688
    # lockset-internal lock too
1689
    self.assertTrue(self.ls.downgrade(names="three"))
1690
    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1691
    self.assertTrue(compat.all(i.is_owned(shared=1)
1692
                               for i in self.ls._get_lockdict().values()))
1693

    
1694
    # Verify owned locks
1695
    for name in self.ls._names():
1696
      self.assertTrue(self.ls.check_owned(name, shared=1))
1697

    
1698
    # Downgrading a shared lock must be a no-op
1699
    self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1700
    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1701
    self.assertTrue(compat.all(i.is_owned(shared=1)
1702
                               for i in self.ls._get_lockdict().values()))
1703

    
1704
    self.ls.release()
1705

    
1706
  def testDowngradeEverything(self):
1707
    self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0),
1708
                     set(["one", "two", "three"]))
1709

    
1710
    # Ensure all locks are now owned in exclusive mode
1711
    for name in self.ls._names():
1712
      self.assertTrue(self.ls.check_owned(name, shared=0))
1713

    
1714
    # Downgrade everything
1715
    self.assertTrue(self.ls.downgrade())
1716

    
1717
    # Ensure all locks are now owned in shared mode
1718
    for name in self.ls._names():
1719
      self.assertTrue(self.ls.check_owned(name, shared=1))
1720

    
1721
  def testPriority(self):
1722
    def _Acquire(prev, next, name, priority, success_fn):
1723
      prev.wait()
1724
      self.assert_(self.ls.acquire(name, shared=0,
1725
                                   priority=priority,
1726
                                   test_notify=lambda _: next.set()))
1727
      try:
1728
        success_fn()
1729
      finally:
1730
        self.ls.release()
1731

    
1732
    # Get all in exclusive mode
1733
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1734

    
1735
    done_two = Queue.Queue(0)
1736

    
1737
    first = threading.Event()
1738
    prev = first
1739

    
1740
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1741
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1742

    
1743
    # Use a deterministic random generator
1744
    random.Random(741).shuffle(acquires)
1745

    
1746
    for (name, prio, done) in acquires:
1747
      ev = threading.Event()
1748
      self._addThread(target=_Acquire,
1749
                      args=(prev, ev, name, prio,
1750
                            compat.partial(done.put, "Prio%s" % prio)))
1751
      prev = ev
1752

    
1753
    # Start acquires
1754
    first.set()
1755

    
1756
    # Wait for last acquire to start
1757
    prev.wait()
1758

    
1759
    # Let threads acquire locks
1760
    self.ls.release()
1761

    
1762
    # Wait for threads to finish
1763
    self._waitThreads()
1764

    
1765
    for i in range(1, 33):
1766
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1767
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1768

    
1769
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1770
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1771

    
1772

    
1773
class TestGanetiLockManager(_ThreadedTestCase):
1774
  def setUp(self):
1775
    _ThreadedTestCase.setUp(self)
1776
    self.nodes = ["n1", "n2"]
1777
    self.nodegroups = ["g1", "g2"]
1778
    self.instances = ["i1", "i2", "i3"]
1779
    self.networks = ["net1", "net2", "net3"]
1780
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1781
                                        self.instances, self.networks)
1782

    
1783
  def tearDown(self):
1784
    # Don't try this at home...
1785
    locking.GanetiLockManager._instance = None
1786

    
1787
  def testLockingConstants(self):
1788
    # The locking library internally cheats by assuming its constants have some
1789
    # relationships with each other. Check those hold true.
1790
    # This relationship is also used in the Processor to recursively acquire
1791
    # the right locks. Again, please don't break it.
1792
    for i in range(len(locking.LEVELS)):
1793
      self.assertEqual(i, locking.LEVELS[i])
1794

    
1795
  def testDoubleGLFails(self):
1796
    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
1797

    
1798
  def testLockNames(self):
1799
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1800
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1801
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1802
                     set(self.nodegroups))
1803
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1804
                     set(self.instances))
1805
    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
1806
                     set(self.networks))
1807

    
1808
  def testInitAndResources(self):
1809
    locking.GanetiLockManager._instance = None
1810
    self.GL = locking.GanetiLockManager([], [], [], [])
1811
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1812
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1813
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1814
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1815
    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
1816

    
1817
    locking.GanetiLockManager._instance = None
1818
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
1819
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1820
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1821
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1822
                                    set(self.nodegroups))
1823
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1824
    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
1825

    
1826
    locking.GanetiLockManager._instance = None
1827
    self.GL = locking.GanetiLockManager([], [], self.instances, [])
1828
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1829
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1830
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1831
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1832
                     set(self.instances))
1833

    
1834
    locking.GanetiLockManager._instance = None
1835
    self.GL = locking.GanetiLockManager([], [], [], self.networks)
1836
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1837
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1838
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1839
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1840
    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
1841
                     set(self.networks))
1842

    
1843
  def testAcquireRelease(self):
1844
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1845
    self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(["BGL"]))
1846
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i1"])
1847
    self.GL.acquire(locking.LEVEL_NODEGROUP, ["g2"])
1848
    self.GL.acquire(locking.LEVEL_NODE, ["n1", "n2"], shared=1)
1849
    self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
1850
                                        shared=1))
1851
    self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
1852
    self.GL.release(locking.LEVEL_NODE, ["n2"])
1853
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(["n1"]))
1854
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
1855
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
1856
    self.GL.release(locking.LEVEL_NODE)
1857
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
1858
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
1859
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
1860
    self.GL.release(locking.LEVEL_NODEGROUP)
1861
    self.GL.release(locking.LEVEL_INSTANCE)
1862
    self.assertRaises(errors.LockError, self.GL.acquire,
1863
                      locking.LEVEL_INSTANCE, ["i5"])
1864
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"], shared=1)
1865
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i3"]))
1866

    
1867
  def testAcquireWholeSets(self):
1868
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1869
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1870
                      set(self.instances))
1871
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1872
                      set(self.instances))
1873
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1874
                      set(self.nodegroups))
1875
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
1876
                      set(self.nodegroups))
1877
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1878
                      set(self.nodes))
1879
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1880
                      set(self.nodes))
1881
    self.GL.release(locking.LEVEL_NODE)
1882
    self.GL.release(locking.LEVEL_NODEGROUP)
1883
    self.GL.release(locking.LEVEL_INSTANCE)
1884
    self.GL.release(locking.LEVEL_CLUSTER)
1885

    
1886
  def testAcquireWholeAndPartial(self):
1887
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1888
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1889
                      set(self.instances))
1890
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1891
                      set(self.instances))
1892
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ["n2"], shared=1),
1893
                      set(["n2"]))
1894
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1895
                      set(["n2"]))
1896
    self.GL.release(locking.LEVEL_NODE)
1897
    self.GL.release(locking.LEVEL_INSTANCE)
1898
    self.GL.release(locking.LEVEL_CLUSTER)
1899

    
1900
  def testBGLDependency(self):
1901
    self.assertRaises(AssertionError, self.GL.acquire,
1902
                      locking.LEVEL_NODE, ["n1", "n2"])
1903
    self.assertRaises(AssertionError, self.GL.acquire,
1904
                      locking.LEVEL_INSTANCE, ["i3"])
1905
    self.assertRaises(AssertionError, self.GL.acquire,
1906
                      locking.LEVEL_NODEGROUP, ["g1"])
1907
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1908
    self.GL.acquire(locking.LEVEL_NODE, ["n1"])
1909
    self.assertRaises(AssertionError, self.GL.release,
1910
                      locking.LEVEL_CLUSTER, ["BGL"])
1911
    self.assertRaises(AssertionError, self.GL.release,
1912
                      locking.LEVEL_CLUSTER)
1913
    self.GL.release(locking.LEVEL_NODE)
1914
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i1", "i2"])
1915
    self.assertRaises(AssertionError, self.GL.release,
1916
                      locking.LEVEL_CLUSTER, ["BGL"])
1917
    self.assertRaises(AssertionError, self.GL.release,
1918
                      locking.LEVEL_CLUSTER)
1919
    self.GL.release(locking.LEVEL_INSTANCE)
1920
    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1921
    self.GL.release(locking.LEVEL_NODEGROUP, ["g1"])
1922
    self.assertRaises(AssertionError, self.GL.release,
1923
                      locking.LEVEL_CLUSTER, ["BGL"])
1924
    self.assertRaises(AssertionError, self.GL.release,
1925
                      locking.LEVEL_CLUSTER)
1926
    self.GL.release(locking.LEVEL_NODEGROUP)
1927
    self.GL.release(locking.LEVEL_CLUSTER)
1928

    
1929
  def testWrongOrder(self):
1930
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1931
    self.GL.acquire(locking.LEVEL_NODE, ["n2"])
1932
    self.assertRaises(AssertionError, self.GL.acquire,
1933
                      locking.LEVEL_NODE, ["n1"])
1934
    self.assertRaises(AssertionError, self.GL.acquire,
1935
                      locking.LEVEL_NODEGROUP, ["g1"])
1936
    self.assertRaises(AssertionError, self.GL.acquire,
1937
                      locking.LEVEL_INSTANCE, ["i2"])
1938

    
1939
  def testModifiableLevels(self):
1940
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1941
                      ["BGL2"])
1942
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"])
1943
    self.GL.add(locking.LEVEL_INSTANCE, ["i4"])
1944
    self.GL.remove(locking.LEVEL_INSTANCE, ["i3"])
1945
    self.GL.remove(locking.LEVEL_INSTANCE, ["i1"])
1946
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(["i2", "i4"]))
1947
    self.GL.add(locking.LEVEL_NODE, ["n3"])
1948
    self.GL.remove(locking.LEVEL_NODE, ["n1"])
1949
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(["n2", "n3"]))
1950
    self.GL.add(locking.LEVEL_NODEGROUP, ["g3"])
1951
    self.GL.remove(locking.LEVEL_NODEGROUP, ["g2"])
1952
    self.GL.remove(locking.LEVEL_NODEGROUP, ["g1"])
1953
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(["g3"]))
1954
    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1955
                      ["BGL2"])
1956

    
1957
  # Helper function to run as a thread that shared the BGL and then acquires
1958
  # some locks at another level.
1959
  def _doLock(self, level, names, shared):
1960
    try:
1961
      self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1962
      self.GL.acquire(level, names, shared=shared)
1963
      self.done.put("DONE")
1964
      self.GL.release(level)
1965
      self.GL.release(locking.LEVEL_CLUSTER)
1966
    except errors.LockError:
1967
      self.done.put("ERR")
1968

    
1969
  @_Repeat
1970
  def testConcurrency(self):
1971
    self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1972
    self._addThread(target=self._doLock,
1973
                    args=(locking.LEVEL_INSTANCE, "i1", 1))
1974
    self._waitThreads()
1975
    self.assertEqual(self.done.get_nowait(), "DONE")
1976
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"])
1977
    self._addThread(target=self._doLock,
1978
                    args=(locking.LEVEL_INSTANCE, "i1", 1))
1979
    self._waitThreads()
1980
    self.assertEqual(self.done.get_nowait(), "DONE")
1981
    self._addThread(target=self._doLock,
1982
                    args=(locking.LEVEL_INSTANCE, "i3", 1))
1983
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1984
    self.GL.release(locking.LEVEL_INSTANCE)
1985
    self._waitThreads()
1986
    self.assertEqual(self.done.get_nowait(), "DONE")
1987
    self.GL.acquire(locking.LEVEL_INSTANCE, ["i2"], shared=1)
1988
    self._addThread(target=self._doLock,
1989
                    args=(locking.LEVEL_INSTANCE, "i2", 1))
1990
    self._waitThreads()
1991
    self.assertEqual(self.done.get_nowait(), "DONE")
1992
    self._addThread(target=self._doLock,
1993
                    args=(locking.LEVEL_INSTANCE, "i2", 0))
1994
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1995
    self.GL.release(locking.LEVEL_INSTANCE)
1996
    self._waitThreads()
1997
    self.assertEqual(self.done.get(True, 1), "DONE")
1998
    self.GL.release(locking.LEVEL_CLUSTER, ["BGL"])
1999

    
2000

    
2001
class TestLockMonitor(_ThreadedTestCase):
2002
  def setUp(self):
2003
    _ThreadedTestCase.setUp(self)
2004
    self.lm = locking.LockMonitor()
2005

    
2006
  def testSingleThread(self):
2007
    locks = []
2008

    
2009
    for i in range(100):
2010
      name = "TestLock%s" % i
2011
      locks.append(locking.SharedLock(name, monitor=self.lm))
2012

    
2013
    self.assertEqual(len(self.lm._locks), len(locks))
2014
    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
2015
    self.assertEqual(len(result.fields), 1)
2016
    self.assertEqual(len(result.data), 100)
2017

    
2018
    # Delete all locks
2019
    del locks[:]
2020

    
2021
    # The garbage collector might needs some time
2022
    def _CheckLocks():
2023
      if self.lm._locks:
2024
        raise utils.RetryAgain()
2025

    
2026
    utils.Retry(_CheckLocks, 0.1, 30.0)
2027

    
2028
    self.assertFalse(self.lm._locks)
2029

    
2030
  def testMultiThread(self):
2031
    locks = []
2032

    
2033
    def _CreateLock(prev, next, name):
2034
      prev.wait()
2035
      locks.append(locking.SharedLock(name, monitor=self.lm))
2036
      if next:
2037
        next.set()
2038

    
2039
    expnames = []
2040

    
2041
    first = threading.Event()
2042
    prev = first
2043

    
2044
    # Use a deterministic random generator
2045
    for i in random.Random(4263).sample(range(100), 33):
2046
      name = "MtTestLock%s" % i
2047
      expnames.append(name)
2048

    
2049
      ev = threading.Event()
2050
      self._addThread(target=_CreateLock, args=(prev, ev, name))
2051
      prev = ev
2052

    
2053
    # Add locks
2054
    first.set()
2055
    self._waitThreads()
2056

    
2057
    # Check order in which locks were added
2058
    self.assertEqual([i.name for i in locks], expnames)
2059

    
2060
    # Check query result
2061
    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2062
    self.assert_(isinstance(result, dict))
2063
    response = objects.QueryResponse.FromDict(result)
2064
    self.assertEqual(response.data,
2065
                     [[(constants.RS_NORMAL, name),
2066
                       (constants.RS_NORMAL, None),
2067
                       (constants.RS_NORMAL, None),
2068
                       (constants.RS_NORMAL, [])]
2069
                      for name in utils.NiceSort(expnames)])
2070
    self.assertEqual(len(response.fields), 4)
2071
    self.assertEqual(["name", "mode", "owner", "pending"],
2072
                     [fdef.name for fdef in response.fields])
2073

    
2074
    # Test exclusive acquire
2075
    for tlock in locks[::4]:
2076
      tlock.acquire(shared=0)
2077
      try:
2078
        def _GetExpResult(name):
2079
          if tlock.name == name:
2080
            return [(constants.RS_NORMAL, name),
2081
                    (constants.RS_NORMAL, "exclusive"),
2082
                    (constants.RS_NORMAL,
2083
                     [threading.currentThread().getName()]),
2084
                    (constants.RS_NORMAL, [])]
2085
          return [(constants.RS_NORMAL, name),
2086
                  (constants.RS_NORMAL, None),
2087
                  (constants.RS_NORMAL, None),
2088
                  (constants.RS_NORMAL, [])]
2089

    
2090
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2091
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2092
                         [_GetExpResult(name)
2093
                          for name in utils.NiceSort(expnames)])
2094
      finally:
2095
        tlock.release()
2096

    
2097
    # Test shared acquire
2098
    def _Acquire(lock, shared, ev, notify):
2099
      lock.acquire(shared=shared)
2100
      try:
2101
        notify.set()
2102
        ev.wait()
2103
      finally:
2104
        lock.release()
2105

    
2106
    for tlock1 in locks[::11]:
2107
      for tlock2 in locks[::-15]:
2108
        if tlock2 == tlock1:
2109
          # Avoid deadlocks
2110
          continue
2111

    
2112
        for tlock3 in locks[::10]:
2113
          if tlock3 in (tlock2, tlock1):
2114
            # Avoid deadlocks
2115
            continue
2116

    
2117
          releaseev = threading.Event()
2118

    
2119
          # Acquire locks
2120
          acquireev = []
2121
          tthreads1 = []
2122
          for i in range(3):
2123
            ev = threading.Event()
2124
            tthreads1.append(self._addThread(target=_Acquire,
2125
                                             args=(tlock1, 1, releaseev, ev)))
2126
            acquireev.append(ev)
2127

    
2128
          ev = threading.Event()
2129
          tthread2 = self._addThread(target=_Acquire,
2130
                                     args=(tlock2, 1, releaseev, ev))
2131
          acquireev.append(ev)
2132

    
2133
          ev = threading.Event()
2134
          tthread3 = self._addThread(target=_Acquire,
2135
                                     args=(tlock3, 0, releaseev, ev))
2136
          acquireev.append(ev)
2137

    
2138
          # Wait for all locks to be acquired
2139
          for i in acquireev:
2140
            i.wait()
2141

    
2142
          # Check query result
2143
          result = self.lm.QueryLocks(["name", "mode", "owner"])
2144
          response = objects.QueryResponse.FromDict(result)
2145
          for (name, mode, owner) in response.data:
2146
            (name_status, name_value) = name
2147
            (owner_status, owner_value) = owner
2148

    
2149
            self.assertEqual(name_status, constants.RS_NORMAL)
2150
            self.assertEqual(owner_status, constants.RS_NORMAL)
2151

    
2152
            if name_value == tlock1.name:
2153
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2154
              self.assertEqual(set(owner_value),
2155
                               set(i.getName() for i in tthreads1))
2156
              continue
2157

    
2158
            if name_value == tlock2.name:
2159
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2160
              self.assertEqual(owner_value, [tthread2.getName()])
2161
              continue
2162

    
2163
            if name_value == tlock3.name:
2164
              self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2165
              self.assertEqual(owner_value, [tthread3.getName()])
2166
              continue
2167

    
2168
            self.assert_(name_value in expnames)
2169
            self.assertEqual(mode, (constants.RS_NORMAL, None))
2170
            self.assert_(owner_value is None)
2171

    
2172
          # Release locks again
2173
          releaseev.set()
2174

    
2175
          self._waitThreads()
2176

    
2177
          result = self.lm.QueryLocks(["name", "mode", "owner"])
2178
          self.assertEqual(objects.QueryResponse.FromDict(result).data,
2179
                           [[(constants.RS_NORMAL, name),
2180
                             (constants.RS_NORMAL, None),
2181
                             (constants.RS_NORMAL, None)]
2182
                            for name in utils.NiceSort(expnames)])
2183

    
2184
  def testDelete(self):
2185
    lock = locking.SharedLock("TestLock", monitor=self.lm)
2186

    
2187
    self.assertEqual(len(self.lm._locks), 1)
2188
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2189
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2190
                     [[(constants.RS_NORMAL, lock.name),
2191
                       (constants.RS_NORMAL, None),
2192
                       (constants.RS_NORMAL, None)]])
2193

    
2194
    lock.delete()
2195

    
2196
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2197
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2198
                     [[(constants.RS_NORMAL, lock.name),
2199
                       (constants.RS_NORMAL, "deleted"),
2200
                       (constants.RS_NORMAL, None)]])
2201
    self.assertEqual(len(self.lm._locks), 1)
2202

    
2203
  def testPending(self):
2204
    def _Acquire(lock, shared, prev, next):
2205
      prev.wait()
2206

    
2207
      lock.acquire(shared=shared, test_notify=next.set)
2208
      try:
2209
        pass
2210
      finally:
2211
        lock.release()
2212

    
2213
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
2214

    
2215
    for shared in [0, 1]:
2216
      lock.acquire()
2217
      try:
2218
        self.assertEqual(len(self.lm._locks), 1)
2219
        result = self.lm.QueryLocks(["name", "mode", "owner"])
2220
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2221
                         [[(constants.RS_NORMAL, lock.name),
2222
                           (constants.RS_NORMAL, "exclusive"),
2223
                           (constants.RS_NORMAL,
2224
                            [threading.currentThread().getName()])]])
2225

    
2226
        threads = []
2227

    
2228
        first = threading.Event()
2229
        prev = first
2230

    
2231
        for i in range(5):
2232
          ev = threading.Event()
2233
          threads.append(self._addThread(target=_Acquire,
2234
                                          args=(lock, shared, prev, ev)))
2235
          prev = ev
2236

    
2237
        # Start acquires
2238
        first.set()
2239

    
2240
        # Wait for last acquire to start waiting
2241
        prev.wait()
2242

    
2243
        # NOTE: This works only because QueryLocks will acquire the
2244
        # lock-internal lock again and won't be able to get the information
2245
        # until it has the lock. By then the acquire should be registered in
2246
        # SharedLock.__pending (otherwise it's a bug).
2247

    
2248
        # All acquires are waiting now
2249
        if shared:
2250
          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2251
        else:
2252
          pending = [("exclusive", [t.getName()]) for t in threads]
2253

    
2254
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2255
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2256
                         [[(constants.RS_NORMAL, lock.name),
2257
                           (constants.RS_NORMAL, "exclusive"),
2258
                           (constants.RS_NORMAL,
2259
                            [threading.currentThread().getName()]),
2260
                           (constants.RS_NORMAL, pending)]])
2261

    
2262
        self.assertEqual(len(self.lm._locks), 1)
2263
      finally:
2264
        lock.release()
2265

    
2266
      self._waitThreads()
2267

    
2268
      # No pending acquires
2269
      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2270
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2271
                       [[(constants.RS_NORMAL, lock.name),
2272
                         (constants.RS_NORMAL, None),
2273
                         (constants.RS_NORMAL, None),
2274
                         (constants.RS_NORMAL, [])]])
2275

    
2276
      self.assertEqual(len(self.lm._locks), 1)
2277

    
2278
  def testDeleteAndRecreate(self):
2279
    lname = "TestLock101923193"
2280

    
2281
    # Create some locks with the same name and keep all references
2282
    locks = [locking.SharedLock(lname, monitor=self.lm)
2283
             for _ in range(5)]
2284

    
2285
    self.assertEqual(len(self.lm._locks), len(locks))
2286

    
2287
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2288
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2289
                     [[(constants.RS_NORMAL, lname),
2290
                       (constants.RS_NORMAL, None),
2291
                       (constants.RS_NORMAL, None)]] * 5)
2292

    
2293
    locks[2].delete()
2294

    
2295
    # Check information order
2296
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2297
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2298
                     [[(constants.RS_NORMAL, lname),
2299
                       (constants.RS_NORMAL, None),
2300
                       (constants.RS_NORMAL, None)]] * 2 +
2301
                     [[(constants.RS_NORMAL, lname),
2302
                       (constants.RS_NORMAL, "deleted"),
2303
                       (constants.RS_NORMAL, None)]] +
2304
                     [[(constants.RS_NORMAL, lname),
2305
                       (constants.RS_NORMAL, None),
2306
                       (constants.RS_NORMAL, None)]] * 2)
2307

    
2308
    locks[1].acquire(shared=0)
2309

    
2310
    last_status = [
2311
      [(constants.RS_NORMAL, lname),
2312
       (constants.RS_NORMAL, None),
2313
       (constants.RS_NORMAL, None)],
2314
      [(constants.RS_NORMAL, lname),
2315
       (constants.RS_NORMAL, "exclusive"),
2316
       (constants.RS_NORMAL, [threading.currentThread().getName()])],
2317
      [(constants.RS_NORMAL, lname),
2318
       (constants.RS_NORMAL, "deleted"),
2319
       (constants.RS_NORMAL, None)],
2320
      [(constants.RS_NORMAL, lname),
2321
       (constants.RS_NORMAL, None),
2322
       (constants.RS_NORMAL, None)],
2323
      [(constants.RS_NORMAL, lname),
2324
       (constants.RS_NORMAL, None),
2325
       (constants.RS_NORMAL, None)],
2326
      ]
2327

    
2328
    # Check information order
2329
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2330
    self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2331

    
2332
    self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2333
    self.assertEqual(len(self.lm._locks), len(locks))
2334

    
2335
    # Check lock deletion
2336
    for idx in range(len(locks)):
2337
      del locks[0]
2338
      assert gc.isenabled()
2339
      gc.collect()
2340
      self.assertEqual(len(self.lm._locks), len(locks))
2341
      result = self.lm.QueryLocks(["name", "mode", "owner"])
2342
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2343
                       last_status[idx + 1:])
2344

    
2345
    # All locks should have been deleted
2346
    assert not locks
2347
    self.assertFalse(self.lm._locks)
2348

    
2349
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2350
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2351

    
2352
  class _FakeLock:
2353
    def __init__(self):
2354
      self._info = []
2355

    
2356
    def AddResult(self, *args):
2357
      self._info.append(args)
2358

    
2359
    def CountPending(self):
2360
      return len(self._info)
2361

    
2362
    def GetLockInfo(self, requested):
2363
      (exp_requested, result) = self._info.pop(0)
2364

    
2365
      if exp_requested != requested:
2366
        raise Exception("Requested information (%s) does not match"
2367
                        " expectations (%s)" % (requested, exp_requested))
2368

    
2369
      return result
2370

    
2371
  def testMultipleResults(self):
2372
    fl1 = self._FakeLock()
2373
    fl2 = self._FakeLock()
2374

    
2375
    self.lm.RegisterLock(fl1)
2376
    self.lm.RegisterLock(fl2)
2377

    
2378
    # Empty information
2379
    for i in [fl1, fl2]:
2380
      i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2381
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2382
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2383
    for i in [fl1, fl2]:
2384
      self.assertEqual(i.CountPending(), 0)
2385

    
2386
    # Check ordering
2387
    for fn in [lambda x: x, reversed, sorted]:
2388
      fl1.AddResult(set(), list(fn([
2389
        ("aaa", None, None, None),
2390
        ("bbb", None, None, None),
2391
        ])))
2392
      fl2.AddResult(set(), [])
2393
      result = self.lm.QueryLocks(["name"])
2394
      self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2395
        [(constants.RS_NORMAL, "aaa")],
2396
        [(constants.RS_NORMAL, "bbb")],
2397
        ])
2398
      for i in [fl1, fl2]:
2399
        self.assertEqual(i.CountPending(), 0)
2400

    
2401
      for fn2 in [lambda x: x, reversed, sorted]:
2402
        fl1.AddResult(set([query.LQ_MODE]), list(fn([
2403
          # Same name, but different information
2404
          ("aaa", "mode0", None, None),
2405
          ("aaa", "mode1", None, None),
2406
          ("aaa", "mode2", None, None),
2407
          ("aaa", "mode3", None, None),
2408
          ])))
2409
        fl2.AddResult(set([query.LQ_MODE]), [
2410
          ("zzz", "end", None, None),
2411
          ("000", "start", None, None),
2412
          ] + list(fn2([
2413
          ("aaa", "b200", None, None),
2414
          ("aaa", "b300", None, None),
2415
          ])))
2416
        result = self.lm.QueryLocks(["name", "mode"])
2417
        self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2418
          [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2419
          ] + list(fn([
2420
          # Name is the same, so order must be equal to incoming order
2421
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2422
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2423
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2424
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2425
          ])) + list(fn2([
2426
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2427
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2428
          ])) + [
2429
          [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2430
          ])
2431
        for i in [fl1, fl2]:
2432
          self.assertEqual(i.CountPending(), 0)
2433

    
2434

    
2435
if __name__ == "__main__":
2436
  testutils.GanetiTestProgram()