Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 415feb2e

History | View | Annotate | Download (79 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30
import random
31
import gc
32
import itertools
33

    
34
from ganeti import constants
35
from ganeti import locking
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import compat
39
from ganeti import objects
40
from ganeti import query
41

    
42
import testutils
43

    
44

    
45
# This is used to test the ssynchronize decorator.
46
# Since it's passed as input to a decorator it must be declared as a global.
47
_decoratorlock = locking.SharedLock("decorator lock")
48

    
49
#: List for looping tests
50
ITERATIONS = range(8)
51

    
52

    
53
def _Repeat(fn):
54
  """Decorator for executing a function many times"""
55
  def wrapper(*args, **kwargs):
56
    for i in ITERATIONS:
57
      fn(*args, **kwargs)
58
  return wrapper
59

    
60

    
61
def SafeSleep(duration):
62
  start = time.time()
63
  while True:
64
    delay = start + duration - time.time()
65
    if delay <= 0.0:
66
      break
67
    time.sleep(delay)
68

    
69

    
70
class _ThreadedTestCase(unittest.TestCase):
71
  """Test class that supports adding/waiting on threads"""
72
  def setUp(self):
73
    unittest.TestCase.setUp(self)
74
    self.done = Queue.Queue(0)
75
    self.threads = []
76

    
77
  def _addThread(self, *args, **kwargs):
78
    """Create and remember a new thread"""
79
    t = threading.Thread(*args, **kwargs)
80
    self.threads.append(t)
81
    t.start()
82
    return t
83

    
84
  def _waitThreads(self):
85
    """Wait for all our threads to finish"""
86
    for t in self.threads:
87
      t.join(60)
88
      self.failIf(t.isAlive())
89
    self.threads = []
90

    
91

    
92
class _ConditionTestCase(_ThreadedTestCase):
93
  """Common test case for conditions"""
94

    
95
  def setUp(self, cls):
96
    _ThreadedTestCase.setUp(self)
97
    self.lock = threading.Lock()
98
    self.cond = cls(self.lock)
99

    
100
  def _testAcquireRelease(self):
101
    self.assertFalse(self.cond._is_owned())
102
    self.assertRaises(RuntimeError, self.cond.wait, None)
103
    self.assertRaises(RuntimeError, self.cond.notifyAll)
104

    
105
    self.cond.acquire()
106
    self.assert_(self.cond._is_owned())
107
    self.cond.notifyAll()
108
    self.assert_(self.cond._is_owned())
109
    self.cond.release()
110

    
111
    self.assertFalse(self.cond._is_owned())
112
    self.assertRaises(RuntimeError, self.cond.wait, None)
113
    self.assertRaises(RuntimeError, self.cond.notifyAll)
114

    
115
  def _testNotification(self):
116
    def _NotifyAll():
117
      self.done.put("NE")
118
      self.cond.acquire()
119
      self.done.put("NA")
120
      self.cond.notifyAll()
121
      self.done.put("NN")
122
      self.cond.release()
123

    
124
    self.cond.acquire()
125
    self._addThread(target=_NotifyAll)
126
    self.assertEqual(self.done.get(True, 1), "NE")
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.wait(None)
129
    self.assertEqual(self.done.get(True, 1), "NA")
130
    self.assertEqual(self.done.get(True, 1), "NN")
131
    self.assert_(self.cond._is_owned())
132
    self.cond.release()
133
    self.assertFalse(self.cond._is_owned())
134

    
135

    
136
class TestSingleNotifyPipeCondition(_ConditionTestCase):
137
  """SingleNotifyPipeCondition tests"""
138

    
139
  def setUp(self):
140
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141

    
142
  def testAcquireRelease(self):
143
    self._testAcquireRelease()
144

    
145
  def testNotification(self):
146
    self._testNotification()
147

    
148
  def testWaitReuse(self):
149
    self.cond.acquire()
150
    self.cond.wait(0)
151
    self.cond.wait(0.1)
152
    self.cond.release()
153

    
154
  def testNoNotifyReuse(self):
155
    self.cond.acquire()
156
    self.cond.notifyAll()
157
    self.assertRaises(RuntimeError, self.cond.wait, None)
158
    self.assertRaises(RuntimeError, self.cond.notifyAll)
159
    self.cond.release()
160

    
161

    
162
class TestPipeCondition(_ConditionTestCase):
163
  """PipeCondition tests"""
164

    
165
  def setUp(self):
166
    _ConditionTestCase.setUp(self, locking.PipeCondition)
167

    
168
  def testAcquireRelease(self):
169
    self._testAcquireRelease()
170

    
171
  def testNotification(self):
172
    self._testNotification()
173

    
174
  def _TestWait(self, fn):
175
    threads = [
176
      self._addThread(target=fn),
177
      self._addThread(target=fn),
178
      self._addThread(target=fn),
179
      ]
180

    
181
    # Wait for threads to be waiting
182
    for _ in threads:
183
      self.assertEqual(self.done.get(True, 1), "A")
184

    
185
    self.assertRaises(Queue.Empty, self.done.get_nowait)
186

    
187
    self.cond.acquire()
188
    self.assertEqual(len(self.cond._waiters), 3)
189
    self.assertEqual(self.cond._waiters, set(threads))
190
    # This new thread can't acquire the lock, and thus call wait, before we
191
    # release it
192
    self._addThread(target=fn)
193
    self.cond.notifyAll()
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195
    self.cond.release()
196

    
197
    # We should now get 3 W and 1 A (for the new thread) in whatever order
198
    w = 0
199
    a = 0
200
    for i in range(4):
201
      got = self.done.get(True, 1)
202
      if got == "W":
203
        w += 1
204
      elif got == "A":
205
        a += 1
206
      else:
207
        self.fail("Got %s on the done queue" % got)
208

    
209
    self.assertEqual(w, 3)
210
    self.assertEqual(a, 1)
211

    
212
    self.cond.acquire()
213
    self.cond.notifyAll()
214
    self.cond.release()
215
    self._waitThreads()
216
    self.assertEqual(self.done.get_nowait(), "W")
217
    self.assertRaises(Queue.Empty, self.done.get_nowait)
218

    
219
  def testBlockingWait(self):
220
    def _BlockingWait():
221
      self.cond.acquire()
222
      self.done.put("A")
223
      self.cond.wait(None)
224
      self.cond.release()
225
      self.done.put("W")
226

    
227
    self._TestWait(_BlockingWait)
228

    
229
  def testLongTimeoutWait(self):
230
    def _Helper():
231
      self.cond.acquire()
232
      self.done.put("A")
233
      self.cond.wait(15.0)
234
      self.cond.release()
235
      self.done.put("W")
236

    
237
    self._TestWait(_Helper)
238

    
239
  def _TimeoutWait(self, timeout, check):
240
    self.cond.acquire()
241
    self.cond.wait(timeout)
242
    self.cond.release()
243
    self.done.put(check)
244

    
245
  def testShortTimeoutWait(self):
246
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248
    self._waitThreads()
249
    self.assertEqual(self.done.get_nowait(), "T1")
250
    self.assertEqual(self.done.get_nowait(), "T1")
251
    self.assertRaises(Queue.Empty, self.done.get_nowait)
252

    
253
  def testZeroTimeoutWait(self):
254
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257
    self._waitThreads()
258
    self.assertEqual(self.done.get_nowait(), "T0")
259
    self.assertEqual(self.done.get_nowait(), "T0")
260
    self.assertEqual(self.done.get_nowait(), "T0")
261
    self.assertRaises(Queue.Empty, self.done.get_nowait)
262

    
263

    
264
class TestSharedLock(_ThreadedTestCase):
265
  """SharedLock tests"""
266

    
267
  def setUp(self):
268
    _ThreadedTestCase.setUp(self)
269
    self.sl = locking.SharedLock("TestSharedLock")
270

    
271
  def testSequenceAndOwnership(self):
272
    self.assertFalse(self.sl.is_owned())
273
    self.sl.acquire(shared=1)
274
    self.assert_(self.sl.is_owned())
275
    self.assert_(self.sl.is_owned(shared=1))
276
    self.assertFalse(self.sl.is_owned(shared=0))
277
    self.sl.release()
278
    self.assertFalse(self.sl.is_owned())
279
    self.sl.acquire()
280
    self.assert_(self.sl.is_owned())
281
    self.assertFalse(self.sl.is_owned(shared=1))
282
    self.assert_(self.sl.is_owned(shared=0))
283
    self.sl.release()
284
    self.assertFalse(self.sl.is_owned())
285
    self.sl.acquire(shared=1)
286
    self.assert_(self.sl.is_owned())
287
    self.assert_(self.sl.is_owned(shared=1))
288
    self.assertFalse(self.sl.is_owned(shared=0))
289
    self.sl.release()
290
    self.assertFalse(self.sl.is_owned())
291

    
292
  def testBooleanValue(self):
293
    # semaphores are supposed to return a true value on a successful acquire
294
    self.assert_(self.sl.acquire(shared=1))
295
    self.sl.release()
296
    self.assert_(self.sl.acquire())
297
    self.sl.release()
298

    
299
  def testDoubleLockingStoE(self):
300
    self.sl.acquire(shared=1)
301
    self.assertRaises(AssertionError, self.sl.acquire)
302

    
303
  def testDoubleLockingEtoS(self):
304
    self.sl.acquire()
305
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306

    
307
  def testDoubleLockingStoS(self):
308
    self.sl.acquire(shared=1)
309
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310

    
311
  def testDoubleLockingEtoE(self):
312
    self.sl.acquire()
313
    self.assertRaises(AssertionError, self.sl.acquire)
314

    
315
  # helper functions: called in a separate thread they acquire the lock, send
316
  # their identifier on the done queue, then release it.
317
  def _doItSharer(self):
318
    try:
319
      self.sl.acquire(shared=1)
320
      self.done.put('SHR')
321
      self.sl.release()
322
    except errors.LockError:
323
      self.done.put('ERR')
324

    
325
  def _doItExclusive(self):
326
    try:
327
      self.sl.acquire()
328
      self.done.put('EXC')
329
      self.sl.release()
330
    except errors.LockError:
331
      self.done.put('ERR')
332

    
333
  def _doItDelete(self):
334
    try:
335
      self.sl.delete()
336
      self.done.put('DEL')
337
    except errors.LockError:
338
      self.done.put('ERR')
339

    
340
  def testSharersCanCoexist(self):
341
    self.sl.acquire(shared=1)
342
    threading.Thread(target=self._doItSharer).start()
343
    self.assert_(self.done.get(True, 1))
344
    self.sl.release()
345

    
346
  @_Repeat
347
  def testExclusiveBlocksExclusive(self):
348
    self.sl.acquire()
349
    self._addThread(target=self._doItExclusive)
350
    self.assertRaises(Queue.Empty, self.done.get_nowait)
351
    self.sl.release()
352
    self._waitThreads()
353
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
354

    
355
  @_Repeat
356
  def testExclusiveBlocksDelete(self):
357
    self.sl.acquire()
358
    self._addThread(target=self._doItDelete)
359
    self.assertRaises(Queue.Empty, self.done.get_nowait)
360
    self.sl.release()
361
    self._waitThreads()
362
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363
    self.sl = locking.SharedLock(self.sl.name)
364

    
365
  @_Repeat
366
  def testExclusiveBlocksSharer(self):
367
    self.sl.acquire()
368
    self._addThread(target=self._doItSharer)
369
    self.assertRaises(Queue.Empty, self.done.get_nowait)
370
    self.sl.release()
371
    self._waitThreads()
372
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
373

    
374
  @_Repeat
375
  def testSharerBlocksExclusive(self):
376
    self.sl.acquire(shared=1)
377
    self._addThread(target=self._doItExclusive)
378
    self.assertRaises(Queue.Empty, self.done.get_nowait)
379
    self.sl.release()
380
    self._waitThreads()
381
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382

    
383
  @_Repeat
384
  def testSharerBlocksDelete(self):
385
    self.sl.acquire(shared=1)
386
    self._addThread(target=self._doItDelete)
387
    self.assertRaises(Queue.Empty, self.done.get_nowait)
388
    self.sl.release()
389
    self._waitThreads()
390
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391
    self.sl = locking.SharedLock(self.sl.name)
392

    
393
  @_Repeat
394
  def testWaitingExclusiveBlocksSharer(self):
395
    """SKIPPED testWaitingExclusiveBlockSharer"""
396
    return
397

    
398
    self.sl.acquire(shared=1)
399
    # the lock is acquired in shared mode...
400
    self._addThread(target=self._doItExclusive)
401
    # ...but now an exclusive is waiting...
402
    self._addThread(target=self._doItSharer)
403
    # ...so the sharer should be blocked as well
404
    self.assertRaises(Queue.Empty, self.done.get_nowait)
405
    self.sl.release()
406
    self._waitThreads()
407
    # The exclusive passed before
408
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
410

    
411
  @_Repeat
412
  def testWaitingSharerBlocksExclusive(self):
413
    """SKIPPED testWaitingSharerBlocksExclusive"""
414
    return
415

    
416
    self.sl.acquire()
417
    # the lock is acquired in exclusive mode...
418
    self._addThread(target=self._doItSharer)
419
    # ...but now a sharer is waiting...
420
    self._addThread(target=self._doItExclusive)
421
    # ...the exclusive is waiting too...
422
    self.assertRaises(Queue.Empty, self.done.get_nowait)
423
    self.sl.release()
424
    self._waitThreads()
425
    # The sharer passed before
426
    self.assertEqual(self.done.get_nowait(), 'SHR')
427
    self.assertEqual(self.done.get_nowait(), 'EXC')
428

    
429
  def testDelete(self):
430
    self.sl.delete()
431
    self.assertRaises(errors.LockError, self.sl.acquire)
432
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433
    self.assertRaises(errors.LockError, self.sl.delete)
434

    
435
  def testDeleteTimeout(self):
436
    self.sl.delete(timeout=60)
437

    
438
  def testNoDeleteIfSharer(self):
439
    self.sl.acquire(shared=1)
440
    self.assertRaises(AssertionError, self.sl.delete)
441

    
442
  @_Repeat
443
  def testDeletePendingSharersExclusiveDelete(self):
444
    self.sl.acquire()
445
    self._addThread(target=self._doItSharer)
446
    self._addThread(target=self._doItSharer)
447
    self._addThread(target=self._doItExclusive)
448
    self._addThread(target=self._doItDelete)
449
    self.sl.delete()
450
    self._waitThreads()
451
    # The threads who were pending return ERR
452
    for _ in range(4):
453
      self.assertEqual(self.done.get_nowait(), 'ERR')
454
    self.sl = locking.SharedLock(self.sl.name)
455

    
456
  @_Repeat
457
  def testDeletePendingDeleteExclusiveSharers(self):
458
    self.sl.acquire()
459
    self._addThread(target=self._doItDelete)
460
    self._addThread(target=self._doItExclusive)
461
    self._addThread(target=self._doItSharer)
462
    self._addThread(target=self._doItSharer)
463
    self.sl.delete()
464
    self._waitThreads()
465
    # The two threads who were pending return both ERR
466
    self.assertEqual(self.done.get_nowait(), 'ERR')
467
    self.assertEqual(self.done.get_nowait(), 'ERR')
468
    self.assertEqual(self.done.get_nowait(), 'ERR')
469
    self.assertEqual(self.done.get_nowait(), 'ERR')
470
    self.sl = locking.SharedLock(self.sl.name)
471

    
472
  @_Repeat
473
  def testExclusiveAcquireTimeout(self):
474
    for shared in [0, 1]:
475
      on_queue = threading.Event()
476
      release_exclusive = threading.Event()
477

    
478
      def _LockExclusive():
479
        self.sl.acquire(shared=0, test_notify=on_queue.set)
480
        self.done.put("A: start wait")
481
        release_exclusive.wait()
482
        self.done.put("A: end wait")
483
        self.sl.release()
484

    
485
      # Start thread to hold lock in exclusive mode
486
      self._addThread(target=_LockExclusive)
487

    
488
      # Wait for wait to begin
489
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
490

    
491
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
492
      # on the queue
493
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494
                                      test_notify=release_exclusive.set))
495

    
496
      self.done.put("got 2nd")
497
      self.sl.release()
498

    
499
      self._waitThreads()
500

    
501
      self.assertEqual(self.done.get_nowait(), "A: end wait")
502
      self.assertEqual(self.done.get_nowait(), "got 2nd")
503
      self.assertRaises(Queue.Empty, self.done.get_nowait)
504

    
505
  @_Repeat
506
  def testAcquireExpiringTimeout(self):
507
    def _AcquireWithTimeout(shared, timeout):
508
      if not self.sl.acquire(shared=shared, timeout=timeout):
509
        self.done.put("timeout")
510

    
511
    for shared in [0, 1]:
512
      # Lock exclusively
513
      self.sl.acquire()
514

    
515
      # Start shared acquires with timeout between 0 and 20 ms
516
      for i in range(11):
517
        self._addThread(target=_AcquireWithTimeout,
518
                        args=(shared, i * 2.0 / 1000.0))
519

    
520
      # Wait for threads to finish (makes sure the acquire timeout expires
521
      # before releasing the lock)
522
      self._waitThreads()
523

    
524
      # Release lock
525
      self.sl.release()
526

    
527
      for _ in range(11):
528
        self.assertEqual(self.done.get_nowait(), "timeout")
529

    
530
      self.assertRaises(Queue.Empty, self.done.get_nowait)
531

    
532
  @_Repeat
533
  def testSharedSkipExclusiveAcquires(self):
534
    # Tests whether shared acquires jump in front of exclusive acquires in the
535
    # queue.
536

    
537
    def _Acquire(shared, name, notify_ev, wait_ev):
538
      if notify_ev:
539
        notify_fn = notify_ev.set
540
      else:
541
        notify_fn = None
542

    
543
      if wait_ev:
544
        wait_ev.wait()
545

    
546
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
547
        return
548

    
549
      self.done.put(name)
550
      self.sl.release()
551

    
552
    # Get exclusive lock while we fill the queue
553
    self.sl.acquire()
554

    
555
    shrcnt1 = 5
556
    shrcnt2 = 7
557
    shrcnt3 = 9
558
    shrcnt4 = 2
559

    
560
    # Add acquires using threading.Event for synchronization. They'll be
561
    # acquired exactly in the order defined in this list.
562
    acquires = (shrcnt1 * [(1, "shared 1")] +
563
                3 * [(0, "exclusive 1")] +
564
                shrcnt2 * [(1, "shared 2")] +
565
                shrcnt3 * [(1, "shared 3")] +
566
                shrcnt4 * [(1, "shared 4")] +
567
                3 * [(0, "exclusive 2")])
568

    
569
    ev_cur = None
570
    ev_prev = None
571

    
572
    for args in acquires:
573
      ev_cur = threading.Event()
574
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
575
      ev_prev = ev_cur
576

    
577
    # Wait for last acquire to start
578
    ev_prev.wait()
579

    
580
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
581
    # together
582
    self.assertEqual(self.sl._count_pending(), 7)
583

    
584
    # Release exclusive lock and wait
585
    self.sl.release()
586

    
587
    self._waitThreads()
588

    
589
    # Check sequence
590
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591
      # Shared locks aren't guaranteed to be notified in order, but they'll be
592
      # first
593
      tmp = self.done.get_nowait()
594
      if tmp == "shared 1":
595
        shrcnt1 -= 1
596
      elif tmp == "shared 2":
597
        shrcnt2 -= 1
598
      elif tmp == "shared 3":
599
        shrcnt3 -= 1
600
      elif tmp == "shared 4":
601
        shrcnt4 -= 1
602
    self.assertEqual(shrcnt1, 0)
603
    self.assertEqual(shrcnt2, 0)
604
    self.assertEqual(shrcnt3, 0)
605
    self.assertEqual(shrcnt3, 0)
606

    
607
    for _ in range(3):
608
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
609

    
610
    for _ in range(3):
611
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
612

    
613
    self.assertRaises(Queue.Empty, self.done.get_nowait)
614

    
615
  def testIllegalDowngrade(self):
616
    # Not yet acquired
617
    self.assertRaises(AssertionError, self.sl.downgrade)
618

    
619
    # Acquire in shared mode, downgrade should be no-op
620
    self.assertTrue(self.sl.acquire(shared=1))
621
    self.assertTrue(self.sl.is_owned(shared=1))
622
    self.assertTrue(self.sl.downgrade())
623
    self.assertTrue(self.sl.is_owned(shared=1))
624
    self.sl.release()
625

    
626
  def testDowngrade(self):
627
    self.assertTrue(self.sl.acquire())
628
    self.assertTrue(self.sl.is_owned(shared=0))
629
    self.assertTrue(self.sl.downgrade())
630
    self.assertTrue(self.sl.is_owned(shared=1))
631
    self.sl.release()
632

    
633
  @_Repeat
634
  def testDowngradeJumpsAheadOfExclusive(self):
635
    def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636
      self.assertTrue(self.sl.acquire())
637
      self.assertTrue(self.sl.is_owned(shared=0))
638
      ev_got.set()
639
      ev_downgrade.wait()
640
      self.assertTrue(self.sl.is_owned(shared=0))
641
      self.assertTrue(self.sl.downgrade())
642
      self.assertTrue(self.sl.is_owned(shared=1))
643
      ev_release.wait()
644
      self.assertTrue(self.sl.is_owned(shared=1))
645
      self.sl.release()
646

    
647
    def _KeepExclusive2(ev_started, ev_release):
648
      self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649
      self.assertTrue(self.sl.is_owned(shared=0))
650
      ev_release.wait()
651
      self.assertTrue(self.sl.is_owned(shared=0))
652
      self.sl.release()
653

    
654
    def _KeepShared(ev_started, ev_got, ev_release):
655
      self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656
      self.assertTrue(self.sl.is_owned(shared=1))
657
      ev_got.set()
658
      ev_release.wait()
659
      self.assertTrue(self.sl.is_owned(shared=1))
660
      self.sl.release()
661

    
662
    # Acquire lock in exclusive mode
663
    ev_got_excl1 = threading.Event()
664
    ev_downgrade_excl1 = threading.Event()
665
    ev_release_excl1 = threading.Event()
666
    th_excl1 = self._addThread(target=_KeepExclusive,
667
                               args=(ev_got_excl1, ev_downgrade_excl1,
668
                                     ev_release_excl1))
669
    ev_got_excl1.wait()
670

    
671
    # Start a second exclusive acquire
672
    ev_started_excl2 = threading.Event()
673
    ev_release_excl2 = threading.Event()
674
    th_excl2 = self._addThread(target=_KeepExclusive2,
675
                               args=(ev_started_excl2, ev_release_excl2))
676
    ev_started_excl2.wait()
677

    
678
    # Start shared acquires, will jump ahead of second exclusive acquire when
679
    # first exclusive acquire downgrades
680
    ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681
    ev_release_shared = threading.Event()
682

    
683
    th_shared = [self._addThread(target=_KeepShared,
684
                                 args=(ev_started, ev_got, ev_release_shared))
685
                 for (ev_started, ev_got) in ev_shared]
686

    
687
    # Wait for all shared acquires to start
688
    for (ev, _) in ev_shared:
689
      ev.wait()
690

    
691
    # Check lock information
692
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693
                     [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
694
    [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
695
    self.assertEqual([(pendmode, sorted(waiting))
696
                      for (pendmode, waiting) in pending],
697
                     [("exclusive", [th_excl2.getName()]),
698
                      ("shared", sorted(th.getName() for th in th_shared))])
699

    
700
    # Shared acquires won't start until the exclusive lock is downgraded
701
    ev_downgrade_excl1.set()
702

    
703
    # Wait for all shared acquires to be successful
704
    for (_, ev) in ev_shared:
705
      ev.wait()
706

    
707
    # Check lock information again
708
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
709
                                              query.LQ_PENDING])),
710
                     [(self.sl.name, "shared", None,
711
                       [("exclusive", [th_excl2.getName()])])])
712
    [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
713
    self.assertEqual(set(owner), set([th_excl1.getName()] +
714
                                     [th.getName() for th in th_shared]))
715

    
716
    ev_release_excl1.set()
717
    ev_release_excl2.set()
718
    ev_release_shared.set()
719

    
720
    self._waitThreads()
721

    
722
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
723
                                              query.LQ_PENDING])),
724
                     [(self.sl.name, None, None, [])])
725

    
726
  @_Repeat
727
  def testMixedAcquireTimeout(self):
728
    sync = threading.Event()
729

    
730
    def _AcquireShared(ev):
731
      if not self.sl.acquire(shared=1, timeout=None):
732
        return
733

    
734
      self.done.put("shared")
735

    
736
      # Notify main thread
737
      ev.set()
738

    
739
      # Wait for notification from main thread
740
      sync.wait()
741

    
742
      # Release lock
743
      self.sl.release()
744

    
745
    acquires = []
746
    for _ in range(3):
747
      ev = threading.Event()
748
      self._addThread(target=_AcquireShared, args=(ev, ))
749
      acquires.append(ev)
750

    
751
    # Wait for all acquires to finish
752
    for i in acquires:
753
      i.wait()
754

    
755
    self.assertEqual(self.sl._count_pending(), 0)
756

    
757
    # Try to get exclusive lock
758
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
759

    
760
    # Acquire exclusive without timeout
761
    exclsync = threading.Event()
762
    exclev = threading.Event()
763

    
764
    def _AcquireExclusive():
765
      if not self.sl.acquire(shared=0):
766
        return
767

    
768
      self.done.put("exclusive")
769

    
770
      # Notify main thread
771
      exclev.set()
772

    
773
      # Wait for notification from main thread
774
      exclsync.wait()
775

    
776
      self.sl.release()
777

    
778
    self._addThread(target=_AcquireExclusive)
779

    
780
    # Try to get exclusive lock
781
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
782

    
783
    # Make all shared holders release their locks
784
    sync.set()
785

    
786
    # Wait for exclusive acquire to succeed
787
    exclev.wait()
788

    
789
    self.assertEqual(self.sl._count_pending(), 0)
790

    
791
    # Try to get exclusive lock
792
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
793

    
794
    def _AcquireSharedSimple():
795
      if self.sl.acquire(shared=1, timeout=None):
796
        self.done.put("shared2")
797
        self.sl.release()
798

    
799
    for _ in range(10):
800
      self._addThread(target=_AcquireSharedSimple)
801

    
802
    # Tell exclusive lock to release
803
    exclsync.set()
804

    
805
    # Wait for everything to finish
806
    self._waitThreads()
807

    
808
    self.assertEqual(self.sl._count_pending(), 0)
809

    
810
    # Check sequence
811
    for _ in range(3):
812
      self.assertEqual(self.done.get_nowait(), "shared")
813

    
814
    self.assertEqual(self.done.get_nowait(), "exclusive")
815

    
816
    for _ in range(10):
817
      self.assertEqual(self.done.get_nowait(), "shared2")
818

    
819
    self.assertRaises(Queue.Empty, self.done.get_nowait)
820

    
821
  def testPriority(self):
822
    # Acquire in exclusive mode
823
    self.assert_(self.sl.acquire(shared=0))
824

    
825
    # Queue acquires
826
    def _Acquire(prev, next, shared, priority, result):
827
      prev.wait()
828
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
829
      try:
830
        self.done.put(result)
831
      finally:
832
        self.sl.release()
833

    
834
    counter = itertools.count(0)
835
    priorities = range(-20, 30)
836
    first = threading.Event()
837
    prev = first
838

    
839
    # Data structure:
840
    # {
841
    #   priority:
842
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
843
    #      (shared/exclusive, ...),
844
    #      ...,
845
    #     ],
846
    # }
847
    perprio = {}
848

    
849
    # References shared acquire per priority in L{perprio}. Data structure:
850
    # {
851
    #   priority: (shared=1, set(acquire names), set(pending threads)),
852
    # }
853
    prioshared = {}
854

    
855
    for seed in [4979, 9523, 14902, 32440]:
856
      # Use a deterministic random generator
857
      rnd = random.Random(seed)
858
      for priority in [rnd.choice(priorities) for _ in range(30)]:
859
        modes = [0, 1]
860
        rnd.shuffle(modes)
861
        for shared in modes:
862
          # Unique name
863
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
864

    
865
          ev = threading.Event()
866
          thread = self._addThread(target=_Acquire,
867
                                   args=(prev, ev, shared, priority, acqname))
868
          prev = ev
869

    
870
          # Record expected aqcuire, see above for structure
871
          data = (shared, set([acqname]), set([thread]))
872
          priolist = perprio.setdefault(priority, [])
873
          if shared:
874
            priosh = prioshared.get(priority, None)
875
            if priosh:
876
              # Shared acquires are merged
877
              for i, j in zip(priosh[1:], data[1:]):
878
                i.update(j)
879
              assert data[0] == priosh[0]
880
            else:
881
              prioshared[priority] = data
882
              priolist.append(data)
883
          else:
884
            priolist.append(data)
885

    
886
    # Start all acquires and wait for them
887
    first.set()
888
    prev.wait()
889

    
890
    # Check lock information
891
    self.assertEqual(self.sl.GetLockInfo(set()),
892
                     [(self.sl.name, None, None, None)])
893
    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
894
                     [(self.sl.name, "exclusive",
895
                       [threading.currentThread().getName()], None)])
896

    
897
    self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
898
                            perprio)
899

    
900
    # Let threads acquire the lock
901
    self.sl.release()
902

    
903
    # Wait for everything to finish
904
    self._waitThreads()
905

    
906
    self.assert_(self.sl._check_empty())
907

    
908
    # Check acquires by priority
909
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
910
      for (_, names, _) in acquires:
911
        # For shared acquires, the set will contain 1..n entries. For exclusive
912
        # acquires only one.
913
        while names:
914
          names.remove(self.done.get_nowait())
915
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
916

    
917
    self.assertRaises(Queue.Empty, self.done.get_nowait)
918

    
919
  def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
920
    self.assertEqual(name, self.sl.name)
921
    self.assert_(mode is None)
922
    self.assert_(owner is None)
923

    
924
    self.assertEqual([(pendmode, sorted(waiting))
925
                      for (pendmode, waiting) in pending],
926
                     [(["exclusive", "shared"][int(bool(shared))],
927
                       sorted(t.getName() for t in threads))
928
                      for acquires in [perprio[i]
929
                                       for i in sorted(perprio.keys())]
930
                      for (shared, _, threads) in acquires])
931

    
932

    
933
class TestSharedLockInCondition(_ThreadedTestCase):
934
  """SharedLock as a condition lock tests"""
935

    
936
  def setUp(self):
937
    _ThreadedTestCase.setUp(self)
938
    self.sl = locking.SharedLock("TestSharedLockInCondition")
939
    self.setCondition()
940

    
941
  def setCondition(self):
942
    self.cond = threading.Condition(self.sl)
943

    
944
  def testKeepMode(self):
945
    self.cond.acquire(shared=1)
946
    self.assert_(self.sl.is_owned(shared=1))
947
    self.cond.wait(0)
948
    self.assert_(self.sl.is_owned(shared=1))
949
    self.cond.release()
950
    self.cond.acquire(shared=0)
951
    self.assert_(self.sl.is_owned(shared=0))
952
    self.cond.wait(0)
953
    self.assert_(self.sl.is_owned(shared=0))
954
    self.cond.release()
955

    
956

    
957
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
958
  """SharedLock as a pipe condition lock tests"""
959

    
960
  def setCondition(self):
961
    self.cond = locking.PipeCondition(self.sl)
962

    
963

    
964
class TestSSynchronizedDecorator(_ThreadedTestCase):
965
  """Shared Lock Synchronized decorator test"""
966

    
967
  def setUp(self):
968
    _ThreadedTestCase.setUp(self)
969

    
970
  @locking.ssynchronized(_decoratorlock)
971
  def _doItExclusive(self):
972
    self.assert_(_decoratorlock.is_owned())
973
    self.done.put('EXC')
974

    
975
  @locking.ssynchronized(_decoratorlock, shared=1)
976
  def _doItSharer(self):
977
    self.assert_(_decoratorlock.is_owned(shared=1))
978
    self.done.put('SHR')
979

    
980
  def testDecoratedFunctions(self):
981
    self._doItExclusive()
982
    self.assertFalse(_decoratorlock.is_owned())
983
    self._doItSharer()
984
    self.assertFalse(_decoratorlock.is_owned())
985

    
986
  def testSharersCanCoexist(self):
987
    _decoratorlock.acquire(shared=1)
988
    threading.Thread(target=self._doItSharer).start()
989
    self.assert_(self.done.get(True, 1))
990
    _decoratorlock.release()
991

    
992
  @_Repeat
993
  def testExclusiveBlocksExclusive(self):
994
    _decoratorlock.acquire()
995
    self._addThread(target=self._doItExclusive)
996
    # give it a bit of time to check that it's not actually doing anything
997
    self.assertRaises(Queue.Empty, self.done.get_nowait)
998
    _decoratorlock.release()
999
    self._waitThreads()
1000
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1001

    
1002
  @_Repeat
1003
  def testExclusiveBlocksSharer(self):
1004
    _decoratorlock.acquire()
1005
    self._addThread(target=self._doItSharer)
1006
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1007
    _decoratorlock.release()
1008
    self._waitThreads()
1009
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1010

    
1011
  @_Repeat
1012
  def testSharerBlocksExclusive(self):
1013
    _decoratorlock.acquire(shared=1)
1014
    self._addThread(target=self._doItExclusive)
1015
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1016
    _decoratorlock.release()
1017
    self._waitThreads()
1018
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1019

    
1020

    
1021
class TestLockSet(_ThreadedTestCase):
1022
  """LockSet tests"""
1023

    
1024
  def setUp(self):
1025
    _ThreadedTestCase.setUp(self)
1026
    self._setUpLS()
1027

    
1028
  def _setUpLS(self):
1029
    """Helper to (re)initialize the lock set"""
1030
    self.resources = ['one', 'two', 'three']
1031
    self.ls = locking.LockSet(self.resources, "TestLockSet")
1032

    
1033
  def testResources(self):
1034
    self.assertEquals(self.ls._names(), set(self.resources))
1035
    newls = locking.LockSet([], "TestLockSet.testResources")
1036
    self.assertEquals(newls._names(), set())
1037

    
1038
  def testCheckOwnedUnknown(self):
1039
    self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1040
    for shared in [-1, 0, 1, 6378, 24255]:
1041
      self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1042
                                           shared=shared))
1043

    
1044
  def testCheckOwnedUnknownWhileHolding(self):
1045
    self.assertFalse(self.ls.check_owned([]))
1046
    self.ls.acquire("one", shared=1)
1047
    self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1048
    self.assertTrue(self.ls.check_owned("one", shared=1))
1049
    self.assertFalse(self.ls.check_owned("one", shared=0))
1050
    self.assertFalse(self.ls.check_owned(["one", "two"]))
1051
    self.assertRaises(errors.LockError, self.ls.check_owned,
1052
                      ["one", "nonexist"])
1053
    self.assertRaises(errors.LockError, self.ls.check_owned, "")
1054
    self.ls.release()
1055
    self.assertFalse(self.ls.check_owned([]))
1056
    self.assertFalse(self.ls.check_owned("one"))
1057

    
1058
  def testAcquireRelease(self):
1059
    self.assertFalse(self.ls.check_owned(self.ls._names()))
1060
    self.assert_(self.ls.acquire('one'))
1061
    self.assertEquals(self.ls.list_owned(), set(['one']))
1062
    self.assertTrue(self.ls.check_owned("one"))
1063
    self.assertTrue(self.ls.check_owned("one", shared=0))
1064
    self.assertFalse(self.ls.check_owned("one", shared=1))
1065
    self.ls.release()
1066
    self.assertEquals(self.ls.list_owned(), set())
1067
    self.assertFalse(self.ls.check_owned(self.ls._names()))
1068
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
1069
    self.assertEquals(self.ls.list_owned(), set(['one']))
1070
    self.ls.release()
1071
    self.assertEquals(self.ls.list_owned(), set())
1072
    self.ls.acquire(['one', 'two', 'three'])
1073
    self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1074
    self.assertTrue(self.ls.check_owned(self.ls._names()))
1075
    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1076
    self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1077
    self.ls.release('one')
1078
    self.assertFalse(self.ls.check_owned(["one"]))
1079
    self.assertTrue(self.ls.check_owned(["two", "three"]))
1080
    self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1081
    self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1082
    self.assertEquals(self.ls.list_owned(), set(['two', 'three']))
1083
    self.ls.release(['three'])
1084
    self.assertEquals(self.ls.list_owned(), set(['two']))
1085
    self.ls.release()
1086
    self.assertEquals(self.ls.list_owned(), set())
1087
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1088
    self.assertEquals(self.ls.list_owned(), set(['one', 'three']))
1089
    self.ls.release()
1090
    self.assertEquals(self.ls.list_owned(), set())
1091
    for name in self.ls._names():
1092
      self.assertFalse(self.ls.check_owned(name))
1093

    
1094
  def testNoDoubleAcquire(self):
1095
    self.ls.acquire('one')
1096
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
1097
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1098
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1099
    self.ls.release()
1100
    self.ls.acquire(['one', 'three'])
1101
    self.ls.release('one')
1102
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1103
    self.ls.release('three')
1104

    
1105
  def testNoWrongRelease(self):
1106
    self.assertRaises(AssertionError, self.ls.release)
1107
    self.ls.acquire('one')
1108
    self.assertRaises(AssertionError, self.ls.release, 'two')
1109

    
1110
  def testAddRemove(self):
1111
    self.ls.add('four')
1112
    self.assertEquals(self.ls.list_owned(), set())
1113
    self.assert_('four' in self.ls._names())
1114
    self.ls.add(['five', 'six', 'seven'], acquired=1)
1115
    self.assert_('five' in self.ls._names())
1116
    self.assert_('six' in self.ls._names())
1117
    self.assert_('seven' in self.ls._names())
1118
    self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven']))
1119
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1120
    self.assert_('five' not in self.ls._names())
1121
    self.assert_('six' not in self.ls._names())
1122
    self.assertEquals(self.ls.list_owned(), set(['seven']))
1123
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1124
    self.ls.remove('seven')
1125
    self.assert_('seven' not in self.ls._names())
1126
    self.assertEquals(self.ls.list_owned(), set([]))
1127
    self.ls.acquire(None, shared=1)
1128
    self.assertRaises(AssertionError, self.ls.add, 'eight')
1129
    self.ls.release()
1130
    self.ls.acquire(None)
1131
    self.ls.add('eight', acquired=1)
1132
    self.assert_('eight' in self.ls._names())
1133
    self.assert_('eight' in self.ls.list_owned())
1134
    self.ls.add('nine')
1135
    self.assert_('nine' in self.ls._names())
1136
    self.assert_('nine' not in self.ls.list_owned())
1137
    self.ls.release()
1138
    self.ls.remove(['two'])
1139
    self.assert_('two' not in self.ls._names())
1140
    self.ls.acquire('three')
1141
    self.assertEquals(self.ls.remove(['three']), ['three'])
1142
    self.assert_('three' not in self.ls._names())
1143
    self.assertEquals(self.ls.remove('three'), [])
1144
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1145
    self.assert_('one' not in self.ls._names())
1146

    
1147
  def testRemoveNonBlocking(self):
1148
    self.ls.acquire('one')
1149
    self.assertEquals(self.ls.remove('one'), ['one'])
1150
    self.ls.acquire(['two', 'three'])
1151
    self.assertEquals(self.ls.remove(['two', 'three']),
1152
                      ['two', 'three'])
1153

    
1154
  def testNoDoubleAdd(self):
1155
    self.assertRaises(errors.LockError, self.ls.add, 'two')
1156
    self.ls.add('four')
1157
    self.assertRaises(errors.LockError, self.ls.add, 'four')
1158

    
1159
  def testNoWrongRemoves(self):
1160
    self.ls.acquire(['one', 'three'], shared=1)
1161
    # Cannot remove 'two' while holding something which is not a superset
1162
    self.assertRaises(AssertionError, self.ls.remove, 'two')
1163
    # Cannot remove 'three' as we are sharing it
1164
    self.assertRaises(AssertionError, self.ls.remove, 'three')
1165

    
1166
  def testAcquireSetLock(self):
1167
    # acquire the set-lock exclusively
1168
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1169
    self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1170
    self.assertEquals(self.ls.is_owned(), True)
1171
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1172
    # I can still add/remove elements...
1173
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1174
    self.assert_(self.ls.add('six'))
1175
    self.ls.release()
1176
    # share the set-lock
1177
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1178
    # adding new elements is not possible
1179
    self.assertRaises(AssertionError, self.ls.add, 'five')
1180
    self.ls.release()
1181

    
1182
  def testAcquireWithRepetitions(self):
1183
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1184
                      set(['two', 'two', 'three']))
1185
    self.ls.release(['two', 'two'])
1186
    self.assertEquals(self.ls.list_owned(), set(['three']))
1187

    
1188
  def testEmptyAcquire(self):
1189
    # Acquire an empty list of locks...
1190
    self.assertEquals(self.ls.acquire([]), set())
1191
    self.assertEquals(self.ls.list_owned(), set())
1192
    # New locks can still be addded
1193
    self.assert_(self.ls.add('six'))
1194
    # "re-acquiring" is not an issue, since we had really acquired nothing
1195
    self.assertEquals(self.ls.acquire([], shared=1), set())
1196
    self.assertEquals(self.ls.list_owned(), set())
1197
    # We haven't really acquired anything, so we cannot release
1198
    self.assertRaises(AssertionError, self.ls.release)
1199

    
1200
  def _doLockSet(self, names, shared):
1201
    try:
1202
      self.ls.acquire(names, shared=shared)
1203
      self.done.put('DONE')
1204
      self.ls.release()
1205
    except errors.LockError:
1206
      self.done.put('ERR')
1207

    
1208
  def _doAddSet(self, names):
1209
    try:
1210
      self.ls.add(names, acquired=1)
1211
      self.done.put('DONE')
1212
      self.ls.release()
1213
    except errors.LockError:
1214
      self.done.put('ERR')
1215

    
1216
  def _doRemoveSet(self, names):
1217
    self.done.put(self.ls.remove(names))
1218

    
1219
  @_Repeat
1220
  def testConcurrentSharedAcquire(self):
1221
    self.ls.acquire(['one', 'two'], shared=1)
1222
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1223
    self._waitThreads()
1224
    self.assertEqual(self.done.get_nowait(), 'DONE')
1225
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1226
    self._waitThreads()
1227
    self.assertEqual(self.done.get_nowait(), 'DONE')
1228
    self._addThread(target=self._doLockSet, args=('three', 1))
1229
    self._waitThreads()
1230
    self.assertEqual(self.done.get_nowait(), 'DONE')
1231
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1232
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1233
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1234
    self.ls.release()
1235
    self._waitThreads()
1236
    self.assertEqual(self.done.get_nowait(), 'DONE')
1237
    self.assertEqual(self.done.get_nowait(), 'DONE')
1238

    
1239
  @_Repeat
1240
  def testConcurrentExclusiveAcquire(self):
1241
    self.ls.acquire(['one', 'two'])
1242
    self._addThread(target=self._doLockSet, args=('three', 1))
1243
    self._waitThreads()
1244
    self.assertEqual(self.done.get_nowait(), 'DONE')
1245
    self._addThread(target=self._doLockSet, args=('three', 0))
1246
    self._waitThreads()
1247
    self.assertEqual(self.done.get_nowait(), 'DONE')
1248
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1249
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1250
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1251
    self._addThread(target=self._doLockSet, args=('one', 0))
1252
    self._addThread(target=self._doLockSet, args=('one', 1))
1253
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1254
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1255
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1256
    self.ls.release()
1257
    self._waitThreads()
1258
    for _ in range(6):
1259
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1260

    
1261
  @_Repeat
1262
  def testSimpleAcquireTimeoutExpiring(self):
1263
    names = sorted(self.ls._names())
1264
    self.assert_(len(names) >= 3)
1265

    
1266
    # Get name of first lock
1267
    first = names[0]
1268

    
1269
    # Get name of last lock
1270
    last = names.pop()
1271

    
1272
    checks = [
1273
      # Block first and try to lock it again
1274
      (first, first),
1275

    
1276
      # Block last and try to lock all locks
1277
      (None, first),
1278

    
1279
      # Block last and try to lock it again
1280
      (last, last),
1281
      ]
1282

    
1283
    for (wanted, block) in checks:
1284
      # Lock in exclusive mode
1285
      self.assert_(self.ls.acquire(block, shared=0))
1286

    
1287
      def _AcquireOne():
1288
        # Try to get the same lock again with a timeout (should never succeed)
1289
        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1290
        if acquired:
1291
          self.done.put("acquired")
1292
          self.ls.release()
1293
        else:
1294
          self.assert_(acquired is None)
1295
          self.assertFalse(self.ls.list_owned())
1296
          self.assertFalse(self.ls.is_owned())
1297
          self.done.put("not acquired")
1298

    
1299
      self._addThread(target=_AcquireOne)
1300

    
1301
      # Wait for timeout in thread to expire
1302
      self._waitThreads()
1303

    
1304
      # Release exclusive lock again
1305
      self.ls.release()
1306

    
1307
      self.assertEqual(self.done.get_nowait(), "not acquired")
1308
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1309

    
1310
  @_Repeat
1311
  def testDelayedAndExpiringLockAcquire(self):
1312
    self._setUpLS()
1313
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1314

    
1315
    for expire in (False, True):
1316
      names = sorted(self.ls._names())
1317
      self.assertEqual(len(names), 8)
1318

    
1319
      lock_ev = dict([(i, threading.Event()) for i in names])
1320

    
1321
      # Lock all in exclusive mode
1322
      self.assert_(self.ls.acquire(names, shared=0))
1323

    
1324
      if expire:
1325
        # We'll wait at least 300ms per lock
1326
        lockwait = len(names) * [0.3]
1327

    
1328
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1329
        # this gives us up to 2.4s to fail.
1330
        lockall_timeout = 0.4
1331
      else:
1332
        # This should finish rather quickly
1333
        lockwait = None
1334
        lockall_timeout = len(names) * 5.0
1335

    
1336
      def _LockAll():
1337
        def acquire_notification(name):
1338
          if not expire:
1339
            self.done.put("getting %s" % name)
1340

    
1341
          # Kick next lock
1342
          lock_ev[name].set()
1343

    
1344
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1345
                           test_notify=acquire_notification):
1346
          self.done.put("got all")
1347
          self.ls.release()
1348
        else:
1349
          self.done.put("timeout on all")
1350

    
1351
        # Notify all locks
1352
        for ev in lock_ev.values():
1353
          ev.set()
1354

    
1355
      t = self._addThread(target=_LockAll)
1356

    
1357
      for idx, name in enumerate(names):
1358
        # Wait for actual acquire on this lock to start
1359
        lock_ev[name].wait(10.0)
1360

    
1361
        if expire and t.isAlive():
1362
          # Wait some time after getting the notification to make sure the lock
1363
          # acquire will expire
1364
          SafeSleep(lockwait[idx])
1365

    
1366
        self.ls.release(names=name)
1367

    
1368
      self.assertFalse(self.ls.list_owned())
1369

    
1370
      self._waitThreads()
1371

    
1372
      if expire:
1373
        # Not checking which locks were actually acquired. Doing so would be
1374
        # too timing-dependant.
1375
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1376
      else:
1377
        for i in names:
1378
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1379
        self.assertEqual(self.done.get_nowait(), "got all")
1380
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1381

    
1382
  @_Repeat
1383
  def testConcurrentRemove(self):
1384
    self.ls.add('four')
1385
    self.ls.acquire(['one', 'two', 'four'])
1386
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1387
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1388
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1389
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1390
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1391
    self.ls.remove('one')
1392
    self.ls.release()
1393
    self._waitThreads()
1394
    for i in range(4):
1395
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1396
    self.ls.add(['five', 'six'], acquired=1)
1397
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1398
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1399
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1400
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1401
    self.ls.remove('five')
1402
    self.ls.release()
1403
    self._waitThreads()
1404
    for i in range(4):
1405
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1406
    self.ls.acquire(['three', 'four'])
1407
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1408
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1409
    self.ls.remove('four')
1410
    self._waitThreads()
1411
    self.assertEqual(self.done.get_nowait(), ['six'])
1412
    self._addThread(target=self._doRemoveSet, args=(['two']))
1413
    self._waitThreads()
1414
    self.assertEqual(self.done.get_nowait(), ['two'])
1415
    self.ls.release()
1416
    # reset lockset
1417
    self._setUpLS()
1418

    
1419
  @_Repeat
1420
  def testConcurrentSharedSetLock(self):
1421
    # share the set-lock...
1422
    self.ls.acquire(None, shared=1)
1423
    # ...another thread can share it too
1424
    self._addThread(target=self._doLockSet, args=(None, 1))
1425
    self._waitThreads()
1426
    self.assertEqual(self.done.get_nowait(), 'DONE')
1427
    # ...or just share some elements
1428
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1429
    self._waitThreads()
1430
    self.assertEqual(self.done.get_nowait(), 'DONE')
1431
    # ...but not add new ones or remove any
1432
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1433
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1434
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1435
    # this just releases the set-lock
1436
    self.ls.release([])
1437
    t.join(60)
1438
    self.assertEqual(self.done.get_nowait(), 'DONE')
1439
    # release the lock on the actual elements so remove() can proceed too
1440
    self.ls.release()
1441
    self._waitThreads()
1442
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1443
    # reset lockset
1444
    self._setUpLS()
1445

    
1446
  @_Repeat
1447
  def testConcurrentExclusiveSetLock(self):
1448
    # acquire the set-lock...
1449
    self.ls.acquire(None, shared=0)
1450
    # ...no one can do anything else
1451
    self._addThread(target=self._doLockSet, args=(None, 1))
1452
    self._addThread(target=self._doLockSet, args=(None, 0))
1453
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1454
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1455
    self._addThread(target=self._doAddSet, args=(['nine']))
1456
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1457
    self.ls.release()
1458
    self._waitThreads()
1459
    for _ in range(5):
1460
      self.assertEqual(self.done.get(True, 1), 'DONE')
1461
    # cleanup
1462
    self._setUpLS()
1463

    
1464
  @_Repeat
1465
  def testConcurrentSetLockAdd(self):
1466
    self.ls.acquire('one')
1467
    # Another thread wants the whole SetLock
1468
    self._addThread(target=self._doLockSet, args=(None, 0))
1469
    self._addThread(target=self._doLockSet, args=(None, 1))
1470
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1471
    self.assertRaises(AssertionError, self.ls.add, 'four')
1472
    self.ls.release()
1473
    self._waitThreads()
1474
    self.assertEqual(self.done.get_nowait(), 'DONE')
1475
    self.assertEqual(self.done.get_nowait(), 'DONE')
1476
    self.ls.acquire(None)
1477
    self._addThread(target=self._doLockSet, args=(None, 0))
1478
    self._addThread(target=self._doLockSet, args=(None, 1))
1479
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1480
    self.ls.add('four')
1481
    self.ls.add('five', acquired=1)
1482
    self.ls.add('six', acquired=1, shared=1)
1483
    self.assertEquals(self.ls.list_owned(),
1484
      set(['one', 'two', 'three', 'five', 'six']))
1485
    self.assertEquals(self.ls.is_owned(), True)
1486
    self.assertEquals(self.ls._names(),
1487
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1488
    self.ls.release()
1489
    self._waitThreads()
1490
    self.assertEqual(self.done.get_nowait(), 'DONE')
1491
    self.assertEqual(self.done.get_nowait(), 'DONE')
1492
    self._setUpLS()
1493

    
1494
  @_Repeat
1495
  def testEmptyLockSet(self):
1496
    # get the set-lock
1497
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1498
    # now empty it...
1499
    self.ls.remove(['one', 'two', 'three'])
1500
    # and adds/locks by another thread still wait
1501
    self._addThread(target=self._doAddSet, args=(['nine']))
1502
    self._addThread(target=self._doLockSet, args=(None, 1))
1503
    self._addThread(target=self._doLockSet, args=(None, 0))
1504
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1505
    self.ls.release()
1506
    self._waitThreads()
1507
    for _ in range(3):
1508
      self.assertEqual(self.done.get_nowait(), 'DONE')
1509
    # empty it again...
1510
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1511
    # now share it...
1512
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1513
    # other sharers can go, adds still wait
1514
    self._addThread(target=self._doLockSet, args=(None, 1))
1515
    self._waitThreads()
1516
    self.assertEqual(self.done.get_nowait(), 'DONE')
1517
    self._addThread(target=self._doAddSet, args=(['nine']))
1518
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1519
    self.ls.release()
1520
    self._waitThreads()
1521
    self.assertEqual(self.done.get_nowait(), 'DONE')
1522
    self._setUpLS()
1523

    
1524
  def testAcquireWithNamesDowngrade(self):
1525
    self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1526
    self.assertTrue(self.ls.is_owned())
1527
    self.assertFalse(self.ls._get_lock().is_owned())
1528
    self.ls.release()
1529
    self.assertFalse(self.ls.is_owned())
1530
    self.assertFalse(self.ls._get_lock().is_owned())
1531
    # Can't downgrade after releasing
1532
    self.assertRaises(AssertionError, self.ls.downgrade, "two")
1533

    
1534
  def testDowngrade(self):
1535
    # Not owning anything, must raise an exception
1536
    self.assertFalse(self.ls.is_owned())
1537
    self.assertRaises(AssertionError, self.ls.downgrade)
1538

    
1539
    self.assertFalse(compat.any(i.is_owned()
1540
                                for i in self.ls._get_lockdict().values()))
1541
    self.assertFalse(self.ls.check_owned(self.ls._names()))
1542
    for name in self.ls._names():
1543
      self.assertFalse(self.ls.check_owned(name))
1544

    
1545
    self.assertEquals(self.ls.acquire(None, shared=0),
1546
                      set(["one", "two", "three"]))
1547
    self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1548

    
1549
    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1550
    for name in self.ls._names():
1551
      self.assertTrue(self.ls.check_owned(name))
1552
      self.assertTrue(self.ls.check_owned(name, shared=0))
1553
      self.assertFalse(self.ls.check_owned(name, shared=1))
1554

    
1555
    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1556
    self.assertTrue(compat.all(i.is_owned(shared=0)
1557
                               for i in self.ls._get_lockdict().values()))
1558

    
1559
    # Start downgrading locks
1560
    self.assertTrue(self.ls.downgrade(names=["one"]))
1561
    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1562
    self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1563
                               for name, lock in
1564
                                 self.ls._get_lockdict().items()))
1565

    
1566
    self.assertFalse(self.ls.check_owned("one", shared=0))
1567
    self.assertTrue(self.ls.check_owned("one", shared=1))
1568
    self.assertTrue(self.ls.check_owned("two", shared=0))
1569
    self.assertTrue(self.ls.check_owned("three", shared=0))
1570

    
1571
    # Downgrade second lock
1572
    self.assertTrue(self.ls.downgrade(names="two"))
1573
    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1574
    should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1575
    self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1576
                               for name, lock in
1577
                                 self.ls._get_lockdict().items()))
1578

    
1579
    self.assertFalse(self.ls.check_owned("one", shared=0))
1580
    self.assertTrue(self.ls.check_owned("one", shared=1))
1581
    self.assertFalse(self.ls.check_owned("two", shared=0))
1582
    self.assertTrue(self.ls.check_owned("two", shared=1))
1583
    self.assertTrue(self.ls.check_owned("three", shared=0))
1584

    
1585
    # Downgrading the last exclusive lock to shared must downgrade the
1586
    # lockset-internal lock too
1587
    self.assertTrue(self.ls.downgrade(names="three"))
1588
    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1589
    self.assertTrue(compat.all(i.is_owned(shared=1)
1590
                               for i in self.ls._get_lockdict().values()))
1591

    
1592
    # Verify owned locks
1593
    for name in self.ls._names():
1594
      self.assertTrue(self.ls.check_owned(name, shared=1))
1595

    
1596
    # Downgrading a shared lock must be a no-op
1597
    self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1598
    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1599
    self.assertTrue(compat.all(i.is_owned(shared=1)
1600
                               for i in self.ls._get_lockdict().values()))
1601

    
1602
    self.ls.release()
1603

    
1604
  def testPriority(self):
1605
    def _Acquire(prev, next, name, priority, success_fn):
1606
      prev.wait()
1607
      self.assert_(self.ls.acquire(name, shared=0,
1608
                                   priority=priority,
1609
                                   test_notify=lambda _: next.set()))
1610
      try:
1611
        success_fn()
1612
      finally:
1613
        self.ls.release()
1614

    
1615
    # Get all in exclusive mode
1616
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1617

    
1618
    done_two = Queue.Queue(0)
1619

    
1620
    first = threading.Event()
1621
    prev = first
1622

    
1623
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1624
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1625

    
1626
    # Use a deterministic random generator
1627
    random.Random(741).shuffle(acquires)
1628

    
1629
    for (name, prio, done) in acquires:
1630
      ev = threading.Event()
1631
      self._addThread(target=_Acquire,
1632
                      args=(prev, ev, name, prio,
1633
                            compat.partial(done.put, "Prio%s" % prio)))
1634
      prev = ev
1635

    
1636
    # Start acquires
1637
    first.set()
1638

    
1639
    # Wait for last acquire to start
1640
    prev.wait()
1641

    
1642
    # Let threads acquire locks
1643
    self.ls.release()
1644

    
1645
    # Wait for threads to finish
1646
    self._waitThreads()
1647

    
1648
    for i in range(1, 33):
1649
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1650
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1651

    
1652
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1653
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1654

    
1655

    
1656
class TestGanetiLockManager(_ThreadedTestCase):
1657

    
1658
  def setUp(self):
1659
    _ThreadedTestCase.setUp(self)
1660
    self.nodes=['n1', 'n2']
1661
    self.nodegroups=['g1', 'g2']
1662
    self.instances=['i1', 'i2', 'i3']
1663
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1664
                                        self.instances)
1665

    
1666
  def tearDown(self):
1667
    # Don't try this at home...
1668
    locking.GanetiLockManager._instance = None
1669

    
1670
  def testLockingConstants(self):
1671
    # The locking library internally cheats by assuming its constants have some
1672
    # relationships with each other. Check those hold true.
1673
    # This relationship is also used in the Processor to recursively acquire
1674
    # the right locks. Again, please don't break it.
1675
    for i in range(len(locking.LEVELS)):
1676
      self.assertEqual(i, locking.LEVELS[i])
1677

    
1678
  def testDoubleGLFails(self):
1679
    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1680

    
1681
  def testLockNames(self):
1682
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1683
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1684
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1685
                     set(self.nodegroups))
1686
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1687
                     set(self.instances))
1688

    
1689
  def testInitAndResources(self):
1690
    locking.GanetiLockManager._instance = None
1691
    self.GL = locking.GanetiLockManager([], [], [])
1692
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1693
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1694
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1695
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1696

    
1697
    locking.GanetiLockManager._instance = None
1698
    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1699
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1700
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1701
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1702
                                    set(self.nodegroups))
1703
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1704

    
1705
    locking.GanetiLockManager._instance = None
1706
    self.GL = locking.GanetiLockManager([], [], self.instances)
1707
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1708
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1709
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1710
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1711
                     set(self.instances))
1712

    
1713
  def testAcquireRelease(self):
1714
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1715
    self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1716
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1717
    self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1718
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1719
    self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
1720
                                        shared=1))
1721
    self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
1722
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1723
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1']))
1724
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1725
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1726
    self.GL.release(locking.LEVEL_NODE)
1727
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
1728
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1729
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1730
    self.GL.release(locking.LEVEL_NODEGROUP)
1731
    self.GL.release(locking.LEVEL_INSTANCE)
1732
    self.assertRaises(errors.LockError, self.GL.acquire,
1733
                      locking.LEVEL_INSTANCE, ['i5'])
1734
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1735
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1736

    
1737
  def testAcquireWholeSets(self):
1738
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1739
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1740
                      set(self.instances))
1741
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1742
                      set(self.instances))
1743
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1744
                      set(self.nodegroups))
1745
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
1746
                      set(self.nodegroups))
1747
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1748
                      set(self.nodes))
1749
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1750
                      set(self.nodes))
1751
    self.GL.release(locking.LEVEL_NODE)
1752
    self.GL.release(locking.LEVEL_NODEGROUP)
1753
    self.GL.release(locking.LEVEL_INSTANCE)
1754
    self.GL.release(locking.LEVEL_CLUSTER)
1755

    
1756
  def testAcquireWholeAndPartial(self):
1757
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1758
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1759
                      set(self.instances))
1760
    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1761
                      set(self.instances))
1762
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1763
                      set(['n2']))
1764
    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1765
                      set(['n2']))
1766
    self.GL.release(locking.LEVEL_NODE)
1767
    self.GL.release(locking.LEVEL_INSTANCE)
1768
    self.GL.release(locking.LEVEL_CLUSTER)
1769

    
1770
  def testBGLDependency(self):
1771
    self.assertRaises(AssertionError, self.GL.acquire,
1772
                      locking.LEVEL_NODE, ['n1', 'n2'])
1773
    self.assertRaises(AssertionError, self.GL.acquire,
1774
                      locking.LEVEL_INSTANCE, ['i3'])
1775
    self.assertRaises(AssertionError, self.GL.acquire,
1776
                      locking.LEVEL_NODEGROUP, ['g1'])
1777
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1778
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1779
    self.assertRaises(AssertionError, self.GL.release,
1780
                      locking.LEVEL_CLUSTER, ['BGL'])
1781
    self.assertRaises(AssertionError, self.GL.release,
1782
                      locking.LEVEL_CLUSTER)
1783
    self.GL.release(locking.LEVEL_NODE)
1784
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1785
    self.assertRaises(AssertionError, self.GL.release,
1786
                      locking.LEVEL_CLUSTER, ['BGL'])
1787
    self.assertRaises(AssertionError, self.GL.release,
1788
                      locking.LEVEL_CLUSTER)
1789
    self.GL.release(locking.LEVEL_INSTANCE)
1790
    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1791
    self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1792
    self.assertRaises(AssertionError, self.GL.release,
1793
                      locking.LEVEL_CLUSTER, ['BGL'])
1794
    self.assertRaises(AssertionError, self.GL.release,
1795
                      locking.LEVEL_CLUSTER)
1796
    self.GL.release(locking.LEVEL_NODEGROUP)
1797
    self.GL.release(locking.LEVEL_CLUSTER)
1798

    
1799
  def testWrongOrder(self):
1800
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1801
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1802
    self.assertRaises(AssertionError, self.GL.acquire,
1803
                      locking.LEVEL_NODE, ['n1'])
1804
    self.assertRaises(AssertionError, self.GL.acquire,
1805
                      locking.LEVEL_NODEGROUP, ['g1'])
1806
    self.assertRaises(AssertionError, self.GL.acquire,
1807
                      locking.LEVEL_INSTANCE, ['i2'])
1808

    
1809
  def testModifiableLevels(self):
1810
    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1811
                      ['BGL2'])
1812
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1813
    self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1814
    self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1815
    self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1816
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1817
    self.GL.add(locking.LEVEL_NODE, ['n3'])
1818
    self.GL.remove(locking.LEVEL_NODE, ['n1'])
1819
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1820
    self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1821
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1822
    self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1823
    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1824
    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1825
                      ['BGL2'])
1826

    
1827
  # Helper function to run as a thread that shared the BGL and then acquires
1828
  # some locks at another level.
1829
  def _doLock(self, level, names, shared):
1830
    try:
1831
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1832
      self.GL.acquire(level, names, shared=shared)
1833
      self.done.put('DONE')
1834
      self.GL.release(level)
1835
      self.GL.release(locking.LEVEL_CLUSTER)
1836
    except errors.LockError:
1837
      self.done.put('ERR')
1838

    
1839
  @_Repeat
1840
  def testConcurrency(self):
1841
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1842
    self._addThread(target=self._doLock,
1843
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1844
    self._waitThreads()
1845
    self.assertEqual(self.done.get_nowait(), 'DONE')
1846
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1847
    self._addThread(target=self._doLock,
1848
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1849
    self._waitThreads()
1850
    self.assertEqual(self.done.get_nowait(), 'DONE')
1851
    self._addThread(target=self._doLock,
1852
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1853
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1854
    self.GL.release(locking.LEVEL_INSTANCE)
1855
    self._waitThreads()
1856
    self.assertEqual(self.done.get_nowait(), 'DONE')
1857
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1858
    self._addThread(target=self._doLock,
1859
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1860
    self._waitThreads()
1861
    self.assertEqual(self.done.get_nowait(), 'DONE')
1862
    self._addThread(target=self._doLock,
1863
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1864
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1865
    self.GL.release(locking.LEVEL_INSTANCE)
1866
    self._waitThreads()
1867
    self.assertEqual(self.done.get(True, 1), 'DONE')
1868
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1869

    
1870

    
1871
class TestLockMonitor(_ThreadedTestCase):
1872
  def setUp(self):
1873
    _ThreadedTestCase.setUp(self)
1874
    self.lm = locking.LockMonitor()
1875

    
1876
  def testSingleThread(self):
1877
    locks = []
1878

    
1879
    for i in range(100):
1880
      name = "TestLock%s" % i
1881
      locks.append(locking.SharedLock(name, monitor=self.lm))
1882

    
1883
    self.assertEqual(len(self.lm._locks), len(locks))
1884
    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1885
    self.assertEqual(len(result.fields), 1)
1886
    self.assertEqual(len(result.data), 100)
1887

    
1888
    # Delete all locks
1889
    del locks[:]
1890

    
1891
    # The garbage collector might needs some time
1892
    def _CheckLocks():
1893
      if self.lm._locks:
1894
        raise utils.RetryAgain()
1895

    
1896
    utils.Retry(_CheckLocks, 0.1, 30.0)
1897

    
1898
    self.assertFalse(self.lm._locks)
1899

    
1900
  def testMultiThread(self):
1901
    locks = []
1902

    
1903
    def _CreateLock(prev, next, name):
1904
      prev.wait()
1905
      locks.append(locking.SharedLock(name, monitor=self.lm))
1906
      if next:
1907
        next.set()
1908

    
1909
    expnames = []
1910

    
1911
    first = threading.Event()
1912
    prev = first
1913

    
1914
    # Use a deterministic random generator
1915
    for i in random.Random(4263).sample(range(100), 33):
1916
      name = "MtTestLock%s" % i
1917
      expnames.append(name)
1918

    
1919
      ev = threading.Event()
1920
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1921
      prev = ev
1922

    
1923
    # Add locks
1924
    first.set()
1925
    self._waitThreads()
1926

    
1927
    # Check order in which locks were added
1928
    self.assertEqual([i.name for i in locks], expnames)
1929

    
1930
    # Check query result
1931
    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1932
    self.assert_(isinstance(result, dict))
1933
    response = objects.QueryResponse.FromDict(result)
1934
    self.assertEqual(response.data,
1935
                     [[(constants.RS_NORMAL, name),
1936
                       (constants.RS_NORMAL, None),
1937
                       (constants.RS_NORMAL, None),
1938
                       (constants.RS_NORMAL, [])]
1939
                      for name in utils.NiceSort(expnames)])
1940
    self.assertEqual(len(response.fields), 4)
1941
    self.assertEqual(["name", "mode", "owner", "pending"],
1942
                     [fdef.name for fdef in response.fields])
1943

    
1944
    # Test exclusive acquire
1945
    for tlock in locks[::4]:
1946
      tlock.acquire(shared=0)
1947
      try:
1948
        def _GetExpResult(name):
1949
          if tlock.name == name:
1950
            return [(constants.RS_NORMAL, name),
1951
                    (constants.RS_NORMAL, "exclusive"),
1952
                    (constants.RS_NORMAL,
1953
                     [threading.currentThread().getName()]),
1954
                    (constants.RS_NORMAL, [])]
1955
          return [(constants.RS_NORMAL, name),
1956
                  (constants.RS_NORMAL, None),
1957
                  (constants.RS_NORMAL, None),
1958
                  (constants.RS_NORMAL, [])]
1959

    
1960
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1961
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
1962
                         [_GetExpResult(name)
1963
                          for name in utils.NiceSort(expnames)])
1964
      finally:
1965
        tlock.release()
1966

    
1967
    # Test shared acquire
1968
    def _Acquire(lock, shared, ev, notify):
1969
      lock.acquire(shared=shared)
1970
      try:
1971
        notify.set()
1972
        ev.wait()
1973
      finally:
1974
        lock.release()
1975

    
1976
    for tlock1 in locks[::11]:
1977
      for tlock2 in locks[::-15]:
1978
        if tlock2 == tlock1:
1979
          # Avoid deadlocks
1980
          continue
1981

    
1982
        for tlock3 in locks[::10]:
1983
          if tlock3 in (tlock2, tlock1):
1984
            # Avoid deadlocks
1985
            continue
1986

    
1987
          releaseev = threading.Event()
1988

    
1989
          # Acquire locks
1990
          acquireev = []
1991
          tthreads1 = []
1992
          for i in range(3):
1993
            ev = threading.Event()
1994
            tthreads1.append(self._addThread(target=_Acquire,
1995
                                             args=(tlock1, 1, releaseev, ev)))
1996
            acquireev.append(ev)
1997

    
1998
          ev = threading.Event()
1999
          tthread2 = self._addThread(target=_Acquire,
2000
                                     args=(tlock2, 1, releaseev, ev))
2001
          acquireev.append(ev)
2002

    
2003
          ev = threading.Event()
2004
          tthread3 = self._addThread(target=_Acquire,
2005
                                     args=(tlock3, 0, releaseev, ev))
2006
          acquireev.append(ev)
2007

    
2008
          # Wait for all locks to be acquired
2009
          for i in acquireev:
2010
            i.wait()
2011

    
2012
          # Check query result
2013
          result = self.lm.QueryLocks(["name", "mode", "owner"])
2014
          response = objects.QueryResponse.FromDict(result)
2015
          for (name, mode, owner) in response.data:
2016
            (name_status, name_value) = name
2017
            (owner_status, owner_value) = owner
2018

    
2019
            self.assertEqual(name_status, constants.RS_NORMAL)
2020
            self.assertEqual(owner_status, constants.RS_NORMAL)
2021

    
2022
            if name_value == tlock1.name:
2023
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2024
              self.assertEqual(set(owner_value),
2025
                               set(i.getName() for i in tthreads1))
2026
              continue
2027

    
2028
            if name_value == tlock2.name:
2029
              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2030
              self.assertEqual(owner_value, [tthread2.getName()])
2031
              continue
2032

    
2033
            if name_value == tlock3.name:
2034
              self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2035
              self.assertEqual(owner_value, [tthread3.getName()])
2036
              continue
2037

    
2038
            self.assert_(name_value in expnames)
2039
            self.assertEqual(mode, (constants.RS_NORMAL, None))
2040
            self.assert_(owner_value is None)
2041

    
2042
          # Release locks again
2043
          releaseev.set()
2044

    
2045
          self._waitThreads()
2046

    
2047
          result = self.lm.QueryLocks(["name", "mode", "owner"])
2048
          self.assertEqual(objects.QueryResponse.FromDict(result).data,
2049
                           [[(constants.RS_NORMAL, name),
2050
                             (constants.RS_NORMAL, None),
2051
                             (constants.RS_NORMAL, None)]
2052
                            for name in utils.NiceSort(expnames)])
2053

    
2054
  def testDelete(self):
2055
    lock = locking.SharedLock("TestLock", monitor=self.lm)
2056

    
2057
    self.assertEqual(len(self.lm._locks), 1)
2058
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2059
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2060
                     [[(constants.RS_NORMAL, lock.name),
2061
                       (constants.RS_NORMAL, None),
2062
                       (constants.RS_NORMAL, None)]])
2063

    
2064
    lock.delete()
2065

    
2066
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2067
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2068
                     [[(constants.RS_NORMAL, lock.name),
2069
                       (constants.RS_NORMAL, "deleted"),
2070
                       (constants.RS_NORMAL, None)]])
2071
    self.assertEqual(len(self.lm._locks), 1)
2072

    
2073
  def testPending(self):
2074
    def _Acquire(lock, shared, prev, next):
2075
      prev.wait()
2076

    
2077
      lock.acquire(shared=shared, test_notify=next.set)
2078
      try:
2079
        pass
2080
      finally:
2081
        lock.release()
2082

    
2083
    lock = locking.SharedLock("ExcLock", monitor=self.lm)
2084

    
2085
    for shared in [0, 1]:
2086
      lock.acquire()
2087
      try:
2088
        self.assertEqual(len(self.lm._locks), 1)
2089
        result = self.lm.QueryLocks(["name", "mode", "owner"])
2090
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2091
                         [[(constants.RS_NORMAL, lock.name),
2092
                           (constants.RS_NORMAL, "exclusive"),
2093
                           (constants.RS_NORMAL,
2094
                            [threading.currentThread().getName()])]])
2095

    
2096
        threads = []
2097

    
2098
        first = threading.Event()
2099
        prev = first
2100

    
2101
        for i in range(5):
2102
          ev = threading.Event()
2103
          threads.append(self._addThread(target=_Acquire,
2104
                                          args=(lock, shared, prev, ev)))
2105
          prev = ev
2106

    
2107
        # Start acquires
2108
        first.set()
2109

    
2110
        # Wait for last acquire to start waiting
2111
        prev.wait()
2112

    
2113
        # NOTE: This works only because QueryLocks will acquire the
2114
        # lock-internal lock again and won't be able to get the information
2115
        # until it has the lock. By then the acquire should be registered in
2116
        # SharedLock.__pending (otherwise it's a bug).
2117

    
2118
        # All acquires are waiting now
2119
        if shared:
2120
          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2121
        else:
2122
          pending = [("exclusive", [t.getName()]) for t in threads]
2123

    
2124
        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2125
        self.assertEqual(objects.QueryResponse.FromDict(result).data,
2126
                         [[(constants.RS_NORMAL, lock.name),
2127
                           (constants.RS_NORMAL, "exclusive"),
2128
                           (constants.RS_NORMAL,
2129
                            [threading.currentThread().getName()]),
2130
                           (constants.RS_NORMAL, pending)]])
2131

    
2132
        self.assertEqual(len(self.lm._locks), 1)
2133
      finally:
2134
        lock.release()
2135

    
2136
      self._waitThreads()
2137

    
2138
      # No pending acquires
2139
      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2140
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2141
                       [[(constants.RS_NORMAL, lock.name),
2142
                         (constants.RS_NORMAL, None),
2143
                         (constants.RS_NORMAL, None),
2144
                         (constants.RS_NORMAL, [])]])
2145

    
2146
      self.assertEqual(len(self.lm._locks), 1)
2147

    
2148
  def testDeleteAndRecreate(self):
2149
    lname = "TestLock101923193"
2150

    
2151
    # Create some locks with the same name and keep all references
2152
    locks = [locking.SharedLock(lname, monitor=self.lm)
2153
             for _ in range(5)]
2154

    
2155
    self.assertEqual(len(self.lm._locks), len(locks))
2156

    
2157
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2158
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2159
                     [[(constants.RS_NORMAL, lname),
2160
                       (constants.RS_NORMAL, None),
2161
                       (constants.RS_NORMAL, None)]] * 5)
2162

    
2163
    locks[2].delete()
2164

    
2165
    # Check information order
2166
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2167
    self.assertEqual(objects.QueryResponse.FromDict(result).data,
2168
                     [[(constants.RS_NORMAL, lname),
2169
                       (constants.RS_NORMAL, None),
2170
                       (constants.RS_NORMAL, None)]] * 2 +
2171
                     [[(constants.RS_NORMAL, lname),
2172
                       (constants.RS_NORMAL, "deleted"),
2173
                       (constants.RS_NORMAL, None)]] +
2174
                     [[(constants.RS_NORMAL, lname),
2175
                       (constants.RS_NORMAL, None),
2176
                       (constants.RS_NORMAL, None)]] * 2)
2177

    
2178
    locks[1].acquire(shared=0)
2179

    
2180
    last_status = [
2181
      [(constants.RS_NORMAL, lname),
2182
       (constants.RS_NORMAL, None),
2183
       (constants.RS_NORMAL, None)],
2184
      [(constants.RS_NORMAL, lname),
2185
       (constants.RS_NORMAL, "exclusive"),
2186
       (constants.RS_NORMAL, [threading.currentThread().getName()])],
2187
      [(constants.RS_NORMAL, lname),
2188
       (constants.RS_NORMAL, "deleted"),
2189
       (constants.RS_NORMAL, None)],
2190
      [(constants.RS_NORMAL, lname),
2191
       (constants.RS_NORMAL, None),
2192
       (constants.RS_NORMAL, None)],
2193
      [(constants.RS_NORMAL, lname),
2194
       (constants.RS_NORMAL, None),
2195
       (constants.RS_NORMAL, None)],
2196
      ]
2197

    
2198
    # Check information order
2199
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2200
    self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2201

    
2202
    self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2203
    self.assertEqual(len(self.lm._locks), len(locks))
2204

    
2205
    # Check lock deletion
2206
    for idx in range(len(locks)):
2207
      del locks[0]
2208
      assert gc.isenabled()
2209
      gc.collect()
2210
      self.assertEqual(len(self.lm._locks), len(locks))
2211
      result = self.lm.QueryLocks(["name", "mode", "owner"])
2212
      self.assertEqual(objects.QueryResponse.FromDict(result).data,
2213
                       last_status[idx + 1:])
2214

    
2215
    # All locks should have been deleted
2216
    assert not locks
2217
    self.assertFalse(self.lm._locks)
2218

    
2219
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2220
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2221

    
2222
  class _FakeLock:
2223
    def __init__(self):
2224
      self._info = []
2225

    
2226
    def AddResult(self, *args):
2227
      self._info.append(args)
2228

    
2229
    def CountPending(self):
2230
      return len(self._info)
2231

    
2232
    def GetLockInfo(self, requested):
2233
      (exp_requested, result) = self._info.pop(0)
2234

    
2235
      if exp_requested != requested:
2236
        raise Exception("Requested information (%s) does not match"
2237
                        " expectations (%s)" % (requested, exp_requested))
2238

    
2239
      return result
2240

    
2241
  def testMultipleResults(self):
2242
    fl1 = self._FakeLock()
2243
    fl2 = self._FakeLock()
2244

    
2245
    self.lm.RegisterLock(fl1)
2246
    self.lm.RegisterLock(fl2)
2247

    
2248
    # Empty information
2249
    for i in [fl1, fl2]:
2250
      i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2251
    result = self.lm.QueryLocks(["name", "mode", "owner"])
2252
    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2253
    for i in [fl1, fl2]:
2254
      self.assertEqual(i.CountPending(), 0)
2255

    
2256
    # Check ordering
2257
    for fn in [lambda x: x, reversed, sorted]:
2258
      fl1.AddResult(set(), list(fn([
2259
        ("aaa", None, None, None),
2260
        ("bbb", None, None, None),
2261
        ])))
2262
      fl2.AddResult(set(), [])
2263
      result = self.lm.QueryLocks(["name"])
2264
      self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2265
        [(constants.RS_NORMAL, "aaa")],
2266
        [(constants.RS_NORMAL, "bbb")],
2267
        ])
2268
      for i in [fl1, fl2]:
2269
        self.assertEqual(i.CountPending(), 0)
2270

    
2271
      for fn2 in [lambda x: x, reversed, sorted]:
2272
        fl1.AddResult(set([query.LQ_MODE]), list(fn([
2273
          # Same name, but different information
2274
          ("aaa", "mode0", None, None),
2275
          ("aaa", "mode1", None, None),
2276
          ("aaa", "mode2", None, None),
2277
          ("aaa", "mode3", None, None),
2278
          ])))
2279
        fl2.AddResult(set([query.LQ_MODE]), [
2280
          ("zzz", "end", None, None),
2281
          ("000", "start", None, None),
2282
          ] + list(fn2([
2283
          ("aaa", "b200", None, None),
2284
          ("aaa", "b300", None, None),
2285
          ])))
2286
        result = self.lm.QueryLocks(["name", "mode"])
2287
        self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2288
          [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2289
          ] + list(fn([
2290
          # Name is the same, so order must be equal to incoming order
2291
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2292
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2293
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2294
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2295
          ])) + list(fn2([
2296
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2297
          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2298
          ])) + [
2299
          [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2300
          ])
2301
        for i in [fl1, fl2]:
2302
          self.assertEqual(i.CountPending(), 0)
2303

    
2304

    
2305
if __name__ == '__main__':
2306
  testutils.GanetiTestProgram()