Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.locking_unittest.py @ 48dabc6a

History | View | Annotate | Download (43.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

    
21

    
22
"""Script for unittesting the locking module"""
23

    
24

    
25
import os
26
import unittest
27
import time
28
import Queue
29
import threading
30

    
31
from ganeti import locking
32
from ganeti import errors
33

    
34

    
35
# This is used to test the ssynchronize decorator.
36
# Since it's passed as input to a decorator it must be declared as a global.
37
_decoratorlock = locking.SharedLock()
38

    
39
#: List for looping tests
40
ITERATIONS = range(8)
41

    
42

    
43
def _Repeat(fn):
44
  """Decorator for executing a function many times"""
45
  def wrapper(*args, **kwargs):
46
    for i in ITERATIONS:
47
      fn(*args, **kwargs)
48
  return wrapper
49

    
50

    
51
class _ThreadedTestCase(unittest.TestCase):
52
  """Test class that supports adding/waiting on threads"""
53
  def setUp(self):
54
    unittest.TestCase.setUp(self)
55
    self.threads = []
56

    
57
  def _addThread(self, *args, **kwargs):
58
    """Create and remember a new thread"""
59
    t = threading.Thread(*args, **kwargs)
60
    self.threads.append(t)
61
    t.start()
62
    return t
63

    
64
  def _waitThreads(self):
65
    """Wait for all our threads to finish"""
66
    for t in self.threads:
67
      t.join(60)
68
      self.failIf(t.isAlive())
69
    self.threads = []
70

    
71

    
72
class TestPipeCondition(_ThreadedTestCase):
73
  """_PipeCondition tests"""
74

    
75
  def setUp(self):
76
    _ThreadedTestCase.setUp(self)
77
    self.lock = threading.Lock()
78
    self.cond = locking._PipeCondition(self.lock)
79
    self.done = Queue.Queue(0)
80

    
81
  def testAcquireRelease(self):
82
    self.assert_(not self.cond._is_owned())
83
    self.assertRaises(RuntimeError, self.cond.wait)
84
    self.assertRaises(RuntimeError, self.cond.notifyAll)
85

    
86
    self.cond.acquire()
87
    self.assert_(self.cond._is_owned())
88
    self.cond.notifyAll()
89
    self.assert_(self.cond._is_owned())
90
    self.cond.release()
91

    
92
    self.assert_(not self.cond._is_owned())
93
    self.assertRaises(RuntimeError, self.cond.wait)
94
    self.assertRaises(RuntimeError, self.cond.notifyAll)
95

    
96
  def testNotification(self):
97
    def _NotifyAll():
98
      self.cond.acquire()
99
      self.cond.notifyAll()
100
      self.cond.release()
101

    
102
    self.cond.acquire()
103
    self._addThread(target=_NotifyAll)
104
    self.cond.wait()
105
    self.assert_(self.cond._is_owned())
106
    self.cond.release()
107
    self.assert_(not self.cond._is_owned())
108

    
109
  def _TestWait(self, fn):
110
    self._addThread(target=fn)
111
    self._addThread(target=fn)
112
    self._addThread(target=fn)
113

    
114
    # Wait for threads to be waiting
115
    self.assertEqual(self.done.get(True, 1), "A")
116
    self.assertEqual(self.done.get(True, 1), "A")
117
    self.assertEqual(self.done.get(True, 1), "A")
118

    
119
    self.assertRaises(Queue.Empty, self.done.get_nowait)
120

    
121
    self.cond.acquire()
122
    self.assertEqual(self.cond._nwaiters, 3)
123
    # This new thread can"t acquire the lock, and thus call wait, before we
124
    # release it
125
    self._addThread(target=fn)
126
    self.cond.notifyAll()
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.release()
129

    
130
    # We should now get 3 W and 1 A (for the new thread) in whatever order
131
    w = 0
132
    a = 0
133
    for i in range(4):
134
      got = self.done.get(True, 1)
135
      if got == "W":
136
        w += 1
137
      elif got == "A":
138
        a += 1
139
      else:
140
        self.fail("Got %s on the done queue" % got)
141

    
142
    self.assertEqual(w, 3)
143
    self.assertEqual(a, 1)
144

    
145
    self.cond.acquire()
146
    self.cond.notifyAll()
147
    self.cond.release()
148
    self._waitThreads()
149
    self.assertEqual(self.done.get_nowait(), "W")
150
    self.assertRaises(Queue.Empty, self.done.get_nowait)
151

    
152
  def testBlockingWait(self):
153
    def _BlockingWait():
154
      self.cond.acquire()
155
      self.done.put("A")
156
      self.cond.wait()
157
      self.cond.release()
158
      self.done.put("W")
159

    
160
    self._TestWait(_BlockingWait)
161

    
162
  def testLongTimeoutWait(self):
163
    def _Helper():
164
      self.cond.acquire()
165
      self.done.put("A")
166
      self.cond.wait(15.0)
167
      self.cond.release()
168
      self.done.put("W")
169

    
170
    self._TestWait(_Helper)
171

    
172
  def _TimeoutWait(self, timeout, check):
173
    self.cond.acquire()
174
    self.cond.wait(timeout)
175
    self.cond.release()
176
    self.done.put(check)
177

    
178
  def testShortTimeoutWait(self):
179
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
180
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
181
    self._waitThreads()
182
    self.assertEqual(self.done.get_nowait(), "T1")
183
    self.assertEqual(self.done.get_nowait(), "T1")
184
    self.assertRaises(Queue.Empty, self.done.get_nowait)
185

    
186
  def testZeroTimeoutWait(self):
187
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
188
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
189
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
190
    self._waitThreads()
191
    self.assertEqual(self.done.get_nowait(), "T0")
192
    self.assertEqual(self.done.get_nowait(), "T0")
193
    self.assertEqual(self.done.get_nowait(), "T0")
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195

    
196

    
197
class TestSingleActionPipeCondition(unittest.TestCase):
198
  """_SingleActionPipeCondition tests"""
199

    
200
  def setUp(self):
201
    self.cond = locking._SingleActionPipeCondition()
202

    
203
  def testInitialization(self):
204
    self.assert_(self.cond._read_fd is not None)
205
    self.assert_(self.cond._write_fd is not None)
206
    self.assert_(self.cond._poller is not None)
207
    self.assertEqual(self.cond._nwaiters, 0)
208

    
209
  def testUsageCount(self):
210
    self.cond.StartWaiting()
211
    self.assert_(self.cond._read_fd is not None)
212
    self.assert_(self.cond._write_fd is not None)
213
    self.assert_(self.cond._poller is not None)
214
    self.assertEqual(self.cond._nwaiters, 1)
215

    
216
    # use again
217
    self.cond.StartWaiting()
218
    self.assertEqual(self.cond._nwaiters, 2)
219

    
220
    # there is more than one user
221
    self.assert_(not self.cond.DoneWaiting())
222
    self.assert_(self.cond._read_fd is not None)
223
    self.assert_(self.cond._write_fd is not None)
224
    self.assert_(self.cond._poller is not None)
225
    self.assertEqual(self.cond._nwaiters, 1)
226

    
227
    self.assert_(self.cond.DoneWaiting())
228
    self.assertEqual(self.cond._nwaiters, 0)
229
    self.assert_(self.cond._read_fd is None)
230
    self.assert_(self.cond._write_fd is None)
231
    self.assert_(self.cond._poller is None)
232

    
233
  def testNotify(self):
234
    wait1 = self.cond.StartWaiting()
235
    wait2 = self.cond.StartWaiting()
236

    
237
    self.assert_(self.cond._read_fd is not None)
238
    self.assert_(self.cond._write_fd is not None)
239
    self.assert_(self.cond._poller is not None)
240

    
241
    self.cond.notifyAll()
242

    
243
    self.assert_(self.cond._read_fd is not None)
244
    self.assert_(self.cond._write_fd is None)
245
    self.assert_(self.cond._poller is not None)
246

    
247
    self.assert_(not self.cond.DoneWaiting())
248

    
249
    self.assert_(self.cond._read_fd is not None)
250
    self.assert_(self.cond._write_fd is None)
251
    self.assert_(self.cond._poller is not None)
252

    
253
    self.assert_(self.cond.DoneWaiting())
254

    
255
    self.assert_(self.cond._read_fd is None)
256
    self.assert_(self.cond._write_fd is None)
257
    self.assert_(self.cond._poller is None)
258

    
259
  def testReusage(self):
260
    self.cond.StartWaiting()
261
    self.assert_(self.cond._read_fd is not None)
262
    self.assert_(self.cond._write_fd is not None)
263
    self.assert_(self.cond._poller is not None)
264

    
265
    self.assert_(self.cond.DoneWaiting())
266

    
267
    self.assertRaises(RuntimeError, self.cond.StartWaiting)
268
    self.assert_(self.cond._read_fd is None)
269
    self.assert_(self.cond._write_fd is None)
270
    self.assert_(self.cond._poller is None)
271

    
272
  def testNotifyTwice(self):
273
    self.cond.notifyAll()
274
    self.assertRaises(RuntimeError, self.cond.notifyAll)
275

    
276

    
277
class TestSharedLock(_ThreadedTestCase):
278
  """SharedLock tests"""
279

    
280
  def setUp(self):
281
    _ThreadedTestCase.setUp(self)
282
    self.sl = locking.SharedLock()
283
    # helper threads use the 'done' queue to tell the master they finished.
284
    self.done = Queue.Queue(0)
285

    
286
  def testSequenceAndOwnership(self):
287
    self.assert_(not self.sl._is_owned())
288
    self.sl.acquire(shared=1)
289
    self.assert_(self.sl._is_owned())
290
    self.assert_(self.sl._is_owned(shared=1))
291
    self.assert_(not self.sl._is_owned(shared=0))
292
    self.sl.release()
293
    self.assert_(not self.sl._is_owned())
294
    self.sl.acquire()
295
    self.assert_(self.sl._is_owned())
296
    self.assert_(not self.sl._is_owned(shared=1))
297
    self.assert_(self.sl._is_owned(shared=0))
298
    self.sl.release()
299
    self.assert_(not self.sl._is_owned())
300
    self.sl.acquire(shared=1)
301
    self.assert_(self.sl._is_owned())
302
    self.assert_(self.sl._is_owned(shared=1))
303
    self.assert_(not self.sl._is_owned(shared=0))
304
    self.sl.release()
305
    self.assert_(not self.sl._is_owned())
306

    
307
  def testBooleanValue(self):
308
    # semaphores are supposed to return a true value on a successful acquire
309
    self.assert_(self.sl.acquire(shared=1))
310
    self.sl.release()
311
    self.assert_(self.sl.acquire())
312
    self.sl.release()
313

    
314
  def testDoubleLockingStoE(self):
315
    self.sl.acquire(shared=1)
316
    self.assertRaises(AssertionError, self.sl.acquire)
317

    
318
  def testDoubleLockingEtoS(self):
319
    self.sl.acquire()
320
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
321

    
322
  def testDoubleLockingStoS(self):
323
    self.sl.acquire(shared=1)
324
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
325

    
326
  def testDoubleLockingEtoE(self):
327
    self.sl.acquire()
328
    self.assertRaises(AssertionError, self.sl.acquire)
329

    
330
  # helper functions: called in a separate thread they acquire the lock, send
331
  # their identifier on the done queue, then release it.
332
  def _doItSharer(self):
333
    try:
334
      self.sl.acquire(shared=1)
335
      self.done.put('SHR')
336
      self.sl.release()
337
    except errors.LockError:
338
      self.done.put('ERR')
339

    
340
  def _doItExclusive(self):
341
    try:
342
      self.sl.acquire()
343
      self.done.put('EXC')
344
      self.sl.release()
345
    except errors.LockError:
346
      self.done.put('ERR')
347

    
348
  def _doItDelete(self):
349
    try:
350
      self.sl.delete()
351
      self.done.put('DEL')
352
    except errors.LockError:
353
      self.done.put('ERR')
354

    
355
  def testSharersCanCoexist(self):
356
    self.sl.acquire(shared=1)
357
    threading.Thread(target=self._doItSharer).start()
358
    self.assert_(self.done.get(True, 1))
359
    self.sl.release()
360

    
361
  @_Repeat
362
  def testExclusiveBlocksExclusive(self):
363
    self.sl.acquire()
364
    self._addThread(target=self._doItExclusive)
365
    self.assertRaises(Queue.Empty, self.done.get_nowait)
366
    self.sl.release()
367
    self._waitThreads()
368
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
369

    
370
  @_Repeat
371
  def testExclusiveBlocksDelete(self):
372
    self.sl.acquire()
373
    self._addThread(target=self._doItDelete)
374
    self.assertRaises(Queue.Empty, self.done.get_nowait)
375
    self.sl.release()
376
    self._waitThreads()
377
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
378
    self.sl = locking.SharedLock()
379

    
380
  @_Repeat
381
  def testExclusiveBlocksSharer(self):
382
    self.sl.acquire()
383
    self._addThread(target=self._doItSharer)
384
    self.assertRaises(Queue.Empty, self.done.get_nowait)
385
    self.sl.release()
386
    self._waitThreads()
387
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
388

    
389
  @_Repeat
390
  def testSharerBlocksExclusive(self):
391
    self.sl.acquire(shared=1)
392
    self._addThread(target=self._doItExclusive)
393
    self.assertRaises(Queue.Empty, self.done.get_nowait)
394
    self.sl.release()
395
    self._waitThreads()
396
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
397

    
398
  @_Repeat
399
  def testSharerBlocksDelete(self):
400
    self.sl.acquire(shared=1)
401
    self._addThread(target=self._doItDelete)
402
    self.assertRaises(Queue.Empty, self.done.get_nowait)
403
    self.sl.release()
404
    self._waitThreads()
405
    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
406
    self.sl = locking.SharedLock()
407

    
408
  @_Repeat
409
  def testWaitingExclusiveBlocksSharer(self):
410
    """SKIPPED testWaitingExclusiveBlockSharer"""
411
    return
412

    
413
    self.sl.acquire(shared=1)
414
    # the lock is acquired in shared mode...
415
    self._addThread(target=self._doItExclusive)
416
    # ...but now an exclusive is waiting...
417
    self._addThread(target=self._doItSharer)
418
    # ...so the sharer should be blocked as well
419
    self.assertRaises(Queue.Empty, self.done.get_nowait)
420
    self.sl.release()
421
    self._waitThreads()
422
    # The exclusive passed before
423
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
424
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
425

    
426
  @_Repeat
427
  def testWaitingSharerBlocksExclusive(self):
428
    """SKIPPED testWaitingSharerBlocksExclusive"""
429
    return
430

    
431
    self.sl.acquire()
432
    # the lock is acquired in exclusive mode...
433
    self._addThread(target=self._doItSharer)
434
    # ...but now a sharer is waiting...
435
    self._addThread(target=self._doItExclusive)
436
    # ...the exclusive is waiting too...
437
    self.assertRaises(Queue.Empty, self.done.get_nowait)
438
    self.sl.release()
439
    self._waitThreads()
440
    # The sharer passed before
441
    self.assertEqual(self.done.get_nowait(), 'SHR')
442
    self.assertEqual(self.done.get_nowait(), 'EXC')
443

    
444
  def testDelete(self):
445
    self.sl.delete()
446
    self.assertRaises(errors.LockError, self.sl.acquire)
447
    self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
448
    self.assertRaises(errors.LockError, self.sl.delete)
449

    
450
  def testNoDeleteIfSharer(self):
451
    self.sl.acquire(shared=1)
452
    self.assertRaises(AssertionError, self.sl.delete)
453

    
454
  @_Repeat
455
  def testDeletePendingSharersExclusiveDelete(self):
456
    self.sl.acquire()
457
    self._addThread(target=self._doItSharer)
458
    self._addThread(target=self._doItSharer)
459
    self._addThread(target=self._doItExclusive)
460
    self._addThread(target=self._doItDelete)
461
    self.sl.delete()
462
    self._waitThreads()
463
    # The threads who were pending return ERR
464
    for _ in range(4):
465
      self.assertEqual(self.done.get_nowait(), 'ERR')
466
    self.sl = locking.SharedLock()
467

    
468
  @_Repeat
469
  def testDeletePendingDeleteExclusiveSharers(self):
470
    self.sl.acquire()
471
    self._addThread(target=self._doItDelete)
472
    self._addThread(target=self._doItExclusive)
473
    self._addThread(target=self._doItSharer)
474
    self._addThread(target=self._doItSharer)
475
    self.sl.delete()
476
    self._waitThreads()
477
    # The two threads who were pending return both ERR
478
    self.assertEqual(self.done.get_nowait(), 'ERR')
479
    self.assertEqual(self.done.get_nowait(), 'ERR')
480
    self.assertEqual(self.done.get_nowait(), 'ERR')
481
    self.assertEqual(self.done.get_nowait(), 'ERR')
482
    self.sl = locking.SharedLock()
483

    
484
  @_Repeat
485
  def testExclusiveAcquireTimeout(self):
486
    def _LockExclusive(wait):
487
      self.sl.acquire(shared=0)
488
      self.done.put("A: start sleep")
489
      time.sleep(wait)
490
      self.done.put("A: end sleep")
491
      self.sl.release()
492

    
493
    for shared in [0, 1]:
494
      # Start thread to hold lock for 20 ms
495
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
496

    
497
      # Wait up to 100 ms to get lock
498
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
499
      self.done.put("got 2nd")
500
      self.sl.release()
501

    
502
      self._waitThreads()
503

    
504
      self.assertEqual(self.done.get_nowait(), "A: start sleep")
505
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
506
      self.assertEqual(self.done.get_nowait(), "got 2nd")
507
      self.assertRaises(Queue.Empty, self.done.get_nowait)
508

    
509
  @_Repeat
510
  def testAcquireExpiringTimeout(self):
511
    def _AcquireWithTimeout(shared, timeout):
512
      if not self.sl.acquire(shared=shared, timeout=timeout):
513
        self.done.put("timeout")
514

    
515
    for shared in [0, 1]:
516
      # Lock exclusively
517
      self.sl.acquire()
518

    
519
      # Start shared acquires with timeout between 0 and 20 ms
520
      for i in xrange(11):
521
        self._addThread(target=_AcquireWithTimeout,
522
                        args=(shared, i * 2.0 / 1000.0))
523

    
524
      # Wait for threads to finish (makes sure the acquire timeout expires
525
      # before releasing the lock)
526
      self._waitThreads()
527

    
528
      # Release lock
529
      self.sl.release()
530

    
531
      for _ in xrange(11):
532
        self.assertEqual(self.done.get_nowait(), "timeout")
533

    
534
      self.assertRaises(Queue.Empty, self.done.get_nowait)
535

    
536
  @_Repeat
537
  def testSharedSkipExclusiveAcquires(self):
538
    # Tests whether shared acquires jump in front of exclusive acquires in the
539
    # queue.
540

    
541
    # Get exclusive lock while we fill the queue
542
    self.sl.acquire()
543

    
544
    def _Acquire(shared, name):
545
      if not self.sl.acquire(shared=shared):
546
        return
547

    
548
      self.done.put(name)
549
      self.sl.release()
550

    
551
    # Start shared acquires
552
    for _ in xrange(5):
553
      self._addThread(target=_Acquire, args=(1, "shared A"))
554

    
555
    # Start exclusive acquires
556
    for _ in xrange(3):
557
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
558

    
559
    # More shared acquires
560
    for _ in xrange(5):
561
      self._addThread(target=_Acquire, args=(1, "shared C"))
562

    
563
    # More exclusive acquires
564
    for _ in xrange(3):
565
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
566

    
567
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
568
    # together
569
    self.assertEqual(self.sl._count_pending(), 7)
570

    
571
    # Release exclusive lock and wait
572
    self.sl.release()
573

    
574
    self._waitThreads()
575

    
576
    # Check sequence
577
    for _ in xrange(10):
578
      # Shared locks aren't guaranteed to be notified in order, but they'll be
579
      # first
580
      self.assert_(self.done.get_nowait() in ("shared A", "shared C"))
581

    
582
    for _ in xrange(3):
583
      self.assertEqual(self.done.get_nowait(), "exclusive B")
584

    
585
    for _ in xrange(3):
586
      self.assertEqual(self.done.get_nowait(), "exclusive D")
587

    
588
    self.assertRaises(Queue.Empty, self.done.get_nowait)
589

    
590
  @_Repeat
591
  def testMixedAcquireTimeout(self):
592
    sync = threading.Condition()
593

    
594
    def _AcquireShared(ev):
595
      if not self.sl.acquire(shared=1, timeout=None):
596
        return
597

    
598
      self.done.put("shared")
599

    
600
      # Notify main thread
601
      ev.set()
602

    
603
      # Wait for notification
604
      sync.acquire()
605
      try:
606
        sync.wait()
607
      finally:
608
        sync.release()
609

    
610
      # Release lock
611
      self.sl.release()
612

    
613
    acquires = []
614
    for _ in xrange(3):
615
      ev = threading.Event()
616
      self._addThread(target=_AcquireShared, args=(ev, ))
617
      acquires.append(ev)
618

    
619
    # Wait for all acquires to finish
620
    for i in acquires:
621
      i.wait()
622

    
623
    self.assertEqual(self.sl._count_pending(), 0)
624

    
625
    # Try to get exclusive lock
626
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
627

    
628
    # Acquire exclusive without timeout
629
    exclsync = threading.Condition()
630
    exclev = threading.Event()
631

    
632
    def _AcquireExclusive():
633
      if not self.sl.acquire(shared=0):
634
        return
635

    
636
      self.done.put("exclusive")
637

    
638
      # Notify main thread
639
      exclev.set()
640

    
641
      exclsync.acquire()
642
      try:
643
        exclsync.wait()
644
      finally:
645
        exclsync.release()
646

    
647
      self.sl.release()
648

    
649
    self._addThread(target=_AcquireExclusive)
650

    
651
    # Try to get exclusive lock
652
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
653

    
654
    # Make all shared holders release their locks
655
    sync.acquire()
656
    try:
657
      sync.notifyAll()
658
    finally:
659
      sync.release()
660

    
661
    # Wait for exclusive acquire to succeed
662
    exclev.wait()
663

    
664
    self.assertEqual(self.sl._count_pending(), 0)
665

    
666
    # Try to get exclusive lock
667
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
668

    
669
    def _AcquireSharedSimple():
670
      if self.sl.acquire(shared=1, timeout=None):
671
        self.done.put("shared2")
672
        self.sl.release()
673

    
674
    for _ in xrange(10):
675
      self._addThread(target=_AcquireSharedSimple)
676

    
677
    # Tell exclusive lock to release
678
    exclsync.acquire()
679
    try:
680
      exclsync.notifyAll()
681
    finally:
682
      exclsync.release()
683

    
684
    # Wait for everything to finish
685
    self._waitThreads()
686

    
687
    self.assertEqual(self.sl._count_pending(), 0)
688

    
689
    # Check sequence
690
    for _ in xrange(3):
691
      self.assertEqual(self.done.get_nowait(), "shared")
692

    
693
    self.assertEqual(self.done.get_nowait(), "exclusive")
694

    
695
    for _ in xrange(10):
696
      self.assertEqual(self.done.get_nowait(), "shared2")
697

    
698
    self.assertRaises(Queue.Empty, self.done.get_nowait)
699

    
700

    
701
class TestSSynchronizedDecorator(_ThreadedTestCase):
702
  """Shared Lock Synchronized decorator test"""
703

    
704
  def setUp(self):
705
    _ThreadedTestCase.setUp(self)
706
    # helper threads use the 'done' queue to tell the master they finished.
707
    self.done = Queue.Queue(0)
708

    
709
  @locking.ssynchronized(_decoratorlock)
710
  def _doItExclusive(self):
711
    self.assert_(_decoratorlock._is_owned())
712
    self.done.put('EXC')
713

    
714
  @locking.ssynchronized(_decoratorlock, shared=1)
715
  def _doItSharer(self):
716
    self.assert_(_decoratorlock._is_owned(shared=1))
717
    self.done.put('SHR')
718

    
719
  def testDecoratedFunctions(self):
720
    self._doItExclusive()
721
    self.assert_(not _decoratorlock._is_owned())
722
    self._doItSharer()
723
    self.assert_(not _decoratorlock._is_owned())
724

    
725
  def testSharersCanCoexist(self):
726
    _decoratorlock.acquire(shared=1)
727
    threading.Thread(target=self._doItSharer).start()
728
    self.assert_(self.done.get(True, 1))
729
    _decoratorlock.release()
730

    
731
  @_Repeat
732
  def testExclusiveBlocksExclusive(self):
733
    _decoratorlock.acquire()
734
    self._addThread(target=self._doItExclusive)
735
    # give it a bit of time to check that it's not actually doing anything
736
    self.assertRaises(Queue.Empty, self.done.get_nowait)
737
    _decoratorlock.release()
738
    self._waitThreads()
739
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
740

    
741
  @_Repeat
742
  def testExclusiveBlocksSharer(self):
743
    _decoratorlock.acquire()
744
    self._addThread(target=self._doItSharer)
745
    self.assertRaises(Queue.Empty, self.done.get_nowait)
746
    _decoratorlock.release()
747
    self._waitThreads()
748
    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
749

    
750
  @_Repeat
751
  def testSharerBlocksExclusive(self):
752
    _decoratorlock.acquire(shared=1)
753
    self._addThread(target=self._doItExclusive)
754
    self.assertRaises(Queue.Empty, self.done.get_nowait)
755
    _decoratorlock.release()
756
    self._waitThreads()
757
    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
758

    
759

    
760
class TestLockSet(_ThreadedTestCase):
761
  """LockSet tests"""
762

    
763
  def setUp(self):
764
    _ThreadedTestCase.setUp(self)
765
    self._setUpLS()
766
    # helper threads use the 'done' queue to tell the master they finished.
767
    self.done = Queue.Queue(0)
768

    
769
  def _setUpLS(self):
770
    """Helper to (re)initialize the lock set"""
771
    self.resources = ['one', 'two', 'three']
772
    self.ls = locking.LockSet(members=self.resources)
773

    
774
  def testResources(self):
775
    self.assertEquals(self.ls._names(), set(self.resources))
776
    newls = locking.LockSet()
777
    self.assertEquals(newls._names(), set())
778

    
779
  def testAcquireRelease(self):
780
    self.assert_(self.ls.acquire('one'))
781
    self.assertEquals(self.ls._list_owned(), set(['one']))
782
    self.ls.release()
783
    self.assertEquals(self.ls._list_owned(), set())
784
    self.assertEquals(self.ls.acquire(['one']), set(['one']))
785
    self.assertEquals(self.ls._list_owned(), set(['one']))
786
    self.ls.release()
787
    self.assertEquals(self.ls._list_owned(), set())
788
    self.ls.acquire(['one', 'two', 'three'])
789
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
790
    self.ls.release('one')
791
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
792
    self.ls.release(['three'])
793
    self.assertEquals(self.ls._list_owned(), set(['two']))
794
    self.ls.release()
795
    self.assertEquals(self.ls._list_owned(), set())
796
    self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
797
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
798
    self.ls.release()
799
    self.assertEquals(self.ls._list_owned(), set())
800

    
801
  def testNoDoubleAcquire(self):
802
    self.ls.acquire('one')
803
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
804
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
805
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
806
    self.ls.release()
807
    self.ls.acquire(['one', 'three'])
808
    self.ls.release('one')
809
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
810
    self.ls.release('three')
811

    
812
  def testNoWrongRelease(self):
813
    self.assertRaises(AssertionError, self.ls.release)
814
    self.ls.acquire('one')
815
    self.assertRaises(AssertionError, self.ls.release, 'two')
816

    
817
  def testAddRemove(self):
818
    self.ls.add('four')
819
    self.assertEquals(self.ls._list_owned(), set())
820
    self.assert_('four' in self.ls._names())
821
    self.ls.add(['five', 'six', 'seven'], acquired=1)
822
    self.assert_('five' in self.ls._names())
823
    self.assert_('six' in self.ls._names())
824
    self.assert_('seven' in self.ls._names())
825
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
826
    self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
827
    self.assert_('five' not in self.ls._names())
828
    self.assert_('six' not in self.ls._names())
829
    self.assertEquals(self.ls._list_owned(), set(['seven']))
830
    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
831
    self.ls.remove('seven')
832
    self.assert_('seven' not in self.ls._names())
833
    self.assertEquals(self.ls._list_owned(), set([]))
834
    self.ls.acquire(None, shared=1)
835
    self.assertRaises(AssertionError, self.ls.add, 'eight')
836
    self.ls.release()
837
    self.ls.acquire(None)
838
    self.ls.add('eight', acquired=1)
839
    self.assert_('eight' in self.ls._names())
840
    self.assert_('eight' in self.ls._list_owned())
841
    self.ls.add('nine')
842
    self.assert_('nine' in self.ls._names())
843
    self.assert_('nine' not in self.ls._list_owned())
844
    self.ls.release()
845
    self.ls.remove(['two'])
846
    self.assert_('two' not in self.ls._names())
847
    self.ls.acquire('three')
848
    self.assertEquals(self.ls.remove(['three']), ['three'])
849
    self.assert_('three' not in self.ls._names())
850
    self.assertEquals(self.ls.remove('three'), [])
851
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
852
    self.assert_('one' not in self.ls._names())
853

    
854
  def testRemoveNonBlocking(self):
855
    self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
856
    self.ls.acquire('one')
857
    self.assertEquals(self.ls.remove('one', blocking=0), ['one'])
858
    self.ls.acquire(['two', 'three'])
859
    self.assertEquals(self.ls.remove(['two', 'three'], blocking=0),
860
                      ['two', 'three'])
861

    
862
  def testNoDoubleAdd(self):
863
    self.assertRaises(errors.LockError, self.ls.add, 'two')
864
    self.ls.add('four')
865
    self.assertRaises(errors.LockError, self.ls.add, 'four')
866

    
867
  def testNoWrongRemoves(self):
868
    self.ls.acquire(['one', 'three'], shared=1)
869
    # Cannot remove 'two' while holding something which is not a superset
870
    self.assertRaises(AssertionError, self.ls.remove, 'two')
871
    # Cannot remove 'three' as we are sharing it
872
    self.assertRaises(AssertionError, self.ls.remove, 'three')
873

    
874
  def testAcquireSetLock(self):
875
    # acquire the set-lock exclusively
876
    self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
877
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
878
    self.assertEquals(self.ls._is_owned(), True)
879
    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
880
    # I can still add/remove elements...
881
    self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
882
    self.assert_(self.ls.add('six'))
883
    self.ls.release()
884
    # share the set-lock
885
    self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
886
    # adding new elements is not possible
887
    self.assertRaises(AssertionError, self.ls.add, 'five')
888
    self.ls.release()
889

    
890
  def testAcquireWithRepetitions(self):
891
    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
892
                      set(['two', 'two', 'three']))
893
    self.ls.release(['two', 'two'])
894
    self.assertEquals(self.ls._list_owned(), set(['three']))
895

    
896
  def testEmptyAcquire(self):
897
    # Acquire an empty list of locks...
898
    self.assertEquals(self.ls.acquire([]), set())
899
    self.assertEquals(self.ls._list_owned(), set())
900
    # New locks can still be addded
901
    self.assert_(self.ls.add('six'))
902
    # "re-acquiring" is not an issue, since we had really acquired nothing
903
    self.assertEquals(self.ls.acquire([], shared=1), set())
904
    self.assertEquals(self.ls._list_owned(), set())
905
    # We haven't really acquired anything, so we cannot release
906
    self.assertRaises(AssertionError, self.ls.release)
907

    
908
  def _doLockSet(self, names, shared):
909
    try:
910
      self.ls.acquire(names, shared=shared)
911
      self.done.put('DONE')
912
      self.ls.release()
913
    except errors.LockError:
914
      self.done.put('ERR')
915

    
916
  def _doAddSet(self, names):
917
    try:
918
      self.ls.add(names, acquired=1)
919
      self.done.put('DONE')
920
      self.ls.release()
921
    except errors.LockError:
922
      self.done.put('ERR')
923

    
924
  def _doRemoveSet(self, names):
925
    self.done.put(self.ls.remove(names))
926

    
927
  @_Repeat
928
  def testConcurrentSharedAcquire(self):
929
    self.ls.acquire(['one', 'two'], shared=1)
930
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
931
    self._waitThreads()
932
    self.assertEqual(self.done.get_nowait(), 'DONE')
933
    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
934
    self._waitThreads()
935
    self.assertEqual(self.done.get_nowait(), 'DONE')
936
    self._addThread(target=self._doLockSet, args=('three', 1))
937
    self._waitThreads()
938
    self.assertEqual(self.done.get_nowait(), 'DONE')
939
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
940
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
941
    self.assertRaises(Queue.Empty, self.done.get_nowait)
942
    self.ls.release()
943
    self._waitThreads()
944
    self.assertEqual(self.done.get_nowait(), 'DONE')
945
    self.assertEqual(self.done.get_nowait(), 'DONE')
946

    
947
  @_Repeat
948
  def testConcurrentExclusiveAcquire(self):
949
    self.ls.acquire(['one', 'two'])
950
    self._addThread(target=self._doLockSet, args=('three', 1))
951
    self._waitThreads()
952
    self.assertEqual(self.done.get_nowait(), 'DONE')
953
    self._addThread(target=self._doLockSet, args=('three', 0))
954
    self._waitThreads()
955
    self.assertEqual(self.done.get_nowait(), 'DONE')
956
    self.assertRaises(Queue.Empty, self.done.get_nowait)
957
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
958
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
959
    self._addThread(target=self._doLockSet, args=('one', 0))
960
    self._addThread(target=self._doLockSet, args=('one', 1))
961
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
962
    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
963
    self.assertRaises(Queue.Empty, self.done.get_nowait)
964
    self.ls.release()
965
    self._waitThreads()
966
    for _ in range(6):
967
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
968

    
969
  @_Repeat
970
  def testConcurrentRemove(self):
971
    self.ls.add('four')
972
    self.ls.acquire(['one', 'two', 'four'])
973
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
974
    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
975
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
976
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
977
    self.assertRaises(Queue.Empty, self.done.get_nowait)
978
    self.ls.remove('one')
979
    self.ls.release()
980
    self._waitThreads()
981
    for i in range(4):
982
      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
983
    self.ls.add(['five', 'six'], acquired=1)
984
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
985
    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
986
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
987
    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
988
    self.ls.remove('five')
989
    self.ls.release()
990
    self._waitThreads()
991
    for i in range(4):
992
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
993
    self.ls.acquire(['three', 'four'])
994
    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
995
    self.assertRaises(Queue.Empty, self.done.get_nowait)
996
    self.ls.remove('four')
997
    self._waitThreads()
998
    self.assertEqual(self.done.get_nowait(), ['six'])
999
    self._addThread(target=self._doRemoveSet, args=(['two']))
1000
    self._waitThreads()
1001
    self.assertEqual(self.done.get_nowait(), ['two'])
1002
    self.ls.release()
1003
    # reset lockset
1004
    self._setUpLS()
1005

    
1006
  @_Repeat
1007
  def testConcurrentSharedSetLock(self):
1008
    # share the set-lock...
1009
    self.ls.acquire(None, shared=1)
1010
    # ...another thread can share it too
1011
    self._addThread(target=self._doLockSet, args=(None, 1))
1012
    self._waitThreads()
1013
    self.assertEqual(self.done.get_nowait(), 'DONE')
1014
    # ...or just share some elements
1015
    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1016
    self._waitThreads()
1017
    self.assertEqual(self.done.get_nowait(), 'DONE')
1018
    # ...but not add new ones or remove any
1019
    t = self._addThread(target=self._doAddSet, args=(['nine']))
1020
    self._addThread(target=self._doRemoveSet, args=(['two'], ))
1021
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1022
    # this just releases the set-lock
1023
    self.ls.release([])
1024
    t.join(60)
1025
    self.assertEqual(self.done.get_nowait(), 'DONE')
1026
    # release the lock on the actual elements so remove() can proceed too
1027
    self.ls.release()
1028
    self._waitThreads()
1029
    self.failUnlessEqual(self.done.get_nowait(), ['two'])
1030
    # reset lockset
1031
    self._setUpLS()
1032

    
1033
  @_Repeat
1034
  def testConcurrentExclusiveSetLock(self):
1035
    # acquire the set-lock...
1036
    self.ls.acquire(None, shared=0)
1037
    # ...no one can do anything else
1038
    self._addThread(target=self._doLockSet, args=(None, 1))
1039
    self._addThread(target=self._doLockSet, args=(None, 0))
1040
    self._addThread(target=self._doLockSet, args=(['three'], 0))
1041
    self._addThread(target=self._doLockSet, args=(['two'], 1))
1042
    self._addThread(target=self._doAddSet, args=(['nine']))
1043
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1044
    self.ls.release()
1045
    self._waitThreads()
1046
    for _ in range(5):
1047
      self.assertEqual(self.done.get(True, 1), 'DONE')
1048
    # cleanup
1049
    self._setUpLS()
1050

    
1051
  @_Repeat
1052
  def testConcurrentSetLockAdd(self):
1053
    self.ls.acquire('one')
1054
    # Another thread wants the whole SetLock
1055
    self._addThread(target=self._doLockSet, args=(None, 0))
1056
    self._addThread(target=self._doLockSet, args=(None, 1))
1057
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1058
    self.assertRaises(AssertionError, self.ls.add, 'four')
1059
    self.ls.release()
1060
    self._waitThreads()
1061
    self.assertEqual(self.done.get_nowait(), 'DONE')
1062
    self.assertEqual(self.done.get_nowait(), 'DONE')
1063
    self.ls.acquire(None)
1064
    self._addThread(target=self._doLockSet, args=(None, 0))
1065
    self._addThread(target=self._doLockSet, args=(None, 1))
1066
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1067
    self.ls.add('four')
1068
    self.ls.add('five', acquired=1)
1069
    self.ls.add('six', acquired=1, shared=1)
1070
    self.assertEquals(self.ls._list_owned(),
1071
      set(['one', 'two', 'three', 'five', 'six']))
1072
    self.assertEquals(self.ls._is_owned(), True)
1073
    self.assertEquals(self.ls._names(),
1074
      set(['one', 'two', 'three', 'four', 'five', 'six']))
1075
    self.ls.release()
1076
    self._waitThreads()
1077
    self.assertEqual(self.done.get_nowait(), 'DONE')
1078
    self.assertEqual(self.done.get_nowait(), 'DONE')
1079
    self._setUpLS()
1080

    
1081
  @_Repeat
1082
  def testEmptyLockSet(self):
1083
    # get the set-lock
1084
    self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1085
    # now empty it...
1086
    self.ls.remove(['one', 'two', 'three'])
1087
    # and adds/locks by another thread still wait
1088
    self._addThread(target=self._doAddSet, args=(['nine']))
1089
    self._addThread(target=self._doLockSet, args=(None, 1))
1090
    self._addThread(target=self._doLockSet, args=(None, 0))
1091
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1092
    self.ls.release()
1093
    self._waitThreads()
1094
    for _ in range(3):
1095
      self.assertEqual(self.done.get_nowait(), 'DONE')
1096
    # empty it again...
1097
    self.assertEqual(self.ls.remove(['nine']), ['nine'])
1098
    # now share it...
1099
    self.assertEqual(self.ls.acquire(None, shared=1), set())
1100
    # other sharers can go, adds still wait
1101
    self._addThread(target=self._doLockSet, args=(None, 1))
1102
    self._waitThreads()
1103
    self.assertEqual(self.done.get_nowait(), 'DONE')
1104
    self._addThread(target=self._doAddSet, args=(['nine']))
1105
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1106
    self.ls.release()
1107
    self._waitThreads()
1108
    self.assertEqual(self.done.get_nowait(), 'DONE')
1109
    self._setUpLS()
1110

    
1111

    
1112
class TestGanetiLockManager(_ThreadedTestCase):
1113

    
1114
  def setUp(self):
1115
    _ThreadedTestCase.setUp(self)
1116
    self.nodes=['n1', 'n2']
1117
    self.instances=['i1', 'i2', 'i3']
1118
    self.GL = locking.GanetiLockManager(nodes=self.nodes,
1119
                                        instances=self.instances)
1120
    self.done = Queue.Queue(0)
1121

    
1122
  def tearDown(self):
1123
    # Don't try this at home...
1124
    locking.GanetiLockManager._instance = None
1125

    
1126
  def testLockingConstants(self):
1127
    # The locking library internally cheats by assuming its constants have some
1128
    # relationships with each other. Check those hold true.
1129
    # This relationship is also used in the Processor to recursively acquire
1130
    # the right locks. Again, please don't break it.
1131
    for i in range(len(locking.LEVELS)):
1132
      self.assertEqual(i, locking.LEVELS[i])
1133

    
1134
  def testDoubleGLFails(self):
1135
    self.assertRaises(AssertionError, locking.GanetiLockManager)
1136

    
1137
  def testLockNames(self):
1138
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1139
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1140
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1141
                     set(self.instances))
1142

    
1143
  def testInitAndResources(self):
1144
    locking.GanetiLockManager._instance = None
1145
    self.GL = locking.GanetiLockManager()
1146
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1147
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1148
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1149

    
1150
    locking.GanetiLockManager._instance = None
1151
    self.GL = locking.GanetiLockManager(nodes=self.nodes)
1152
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1153
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1154
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1155

    
1156
    locking.GanetiLockManager._instance = None
1157
    self.GL = locking.GanetiLockManager(instances=self.instances)
1158
    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1159
    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1160
    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1161
                     set(self.instances))
1162

    
1163
  def testAcquireRelease(self):
1164
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1165
    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1166
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1167
    self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1168
    self.GL.release(locking.LEVEL_NODE, ['n2'])
1169
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1170
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1171
    self.GL.release(locking.LEVEL_NODE)
1172
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1173
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1174
    self.GL.release(locking.LEVEL_INSTANCE)
1175
    self.assertRaises(errors.LockError, self.GL.acquire,
1176
                      locking.LEVEL_INSTANCE, ['i5'])
1177
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1178
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1179

    
1180
  def testAcquireWholeSets(self):
1181
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1182
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1183
                      set(self.instances))
1184
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1185
                      set(self.instances))
1186
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1187
                      set(self.nodes))
1188
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1189
                      set(self.nodes))
1190
    self.GL.release(locking.LEVEL_NODE)
1191
    self.GL.release(locking.LEVEL_INSTANCE)
1192
    self.GL.release(locking.LEVEL_CLUSTER)
1193

    
1194
  def testAcquireWholeAndPartial(self):
1195
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1196
    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1197
                      set(self.instances))
1198
    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1199
                      set(self.instances))
1200
    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1201
                      set(['n2']))
1202
    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1203
                      set(['n2']))
1204
    self.GL.release(locking.LEVEL_NODE)
1205
    self.GL.release(locking.LEVEL_INSTANCE)
1206
    self.GL.release(locking.LEVEL_CLUSTER)
1207

    
1208
  def testBGLDependency(self):
1209
    self.assertRaises(AssertionError, self.GL.acquire,
1210
                      locking.LEVEL_NODE, ['n1', 'n2'])
1211
    self.assertRaises(AssertionError, self.GL.acquire,
1212
                      locking.LEVEL_INSTANCE, ['i3'])
1213
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1214
    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1215
    self.assertRaises(AssertionError, self.GL.release,
1216
                      locking.LEVEL_CLUSTER, ['BGL'])
1217
    self.assertRaises(AssertionError, self.GL.release,
1218
                      locking.LEVEL_CLUSTER)
1219
    self.GL.release(locking.LEVEL_NODE)
1220
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1221
    self.assertRaises(AssertionError, self.GL.release,
1222
                      locking.LEVEL_CLUSTER, ['BGL'])
1223
    self.assertRaises(AssertionError, self.GL.release,
1224
                      locking.LEVEL_CLUSTER)
1225
    self.GL.release(locking.LEVEL_INSTANCE)
1226

    
1227
  def testWrongOrder(self):
1228
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1229
    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1230
    self.assertRaises(AssertionError, self.GL.acquire,
1231
                      locking.LEVEL_NODE, ['n1'])
1232
    self.assertRaises(AssertionError, self.GL.acquire,
1233
                      locking.LEVEL_INSTANCE, ['i2'])
1234

    
1235
  # Helper function to run as a thread that shared the BGL and then acquires
1236
  # some locks at another level.
1237
  def _doLock(self, level, names, shared):
1238
    try:
1239
      self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1240
      self.GL.acquire(level, names, shared=shared)
1241
      self.done.put('DONE')
1242
      self.GL.release(level)
1243
      self.GL.release(locking.LEVEL_CLUSTER)
1244
    except errors.LockError:
1245
      self.done.put('ERR')
1246

    
1247
  @_Repeat
1248
  def testConcurrency(self):
1249
    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1250
    self._addThread(target=self._doLock,
1251
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1252
    self._waitThreads()
1253
    self.assertEqual(self.done.get_nowait(), 'DONE')
1254
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1255
    self._addThread(target=self._doLock,
1256
                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
1257
    self._waitThreads()
1258
    self.assertEqual(self.done.get_nowait(), 'DONE')
1259
    self._addThread(target=self._doLock,
1260
                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
1261
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1262
    self.GL.release(locking.LEVEL_INSTANCE)
1263
    self._waitThreads()
1264
    self.assertEqual(self.done.get_nowait(), 'DONE')
1265
    self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1266
    self._addThread(target=self._doLock,
1267
                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
1268
    self._waitThreads()
1269
    self.assertEqual(self.done.get_nowait(), 'DONE')
1270
    self._addThread(target=self._doLock,
1271
                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
1272
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1273
    self.GL.release(locking.LEVEL_INSTANCE)
1274
    self._waitThreads()
1275
    self.assertEqual(self.done.get(True, 1), 'DONE')
1276
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1277

    
1278

    
1279
if __name__ == '__main__':
1280
  unittest.main()
1281
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1282
  #unittest.TextTestRunner(verbosity=2).run(suite)