Revision 84e344d4 test/ganeti.locking_unittest.py

b/test/ganeti.locking_unittest.py
26 26
import unittest
27 27
import time
28 28
import Queue
29
import threading
29 30

  
30 31
from ganeti import locking
31 32
from ganeti import errors
32
from threading import Thread
33 33

  
34 34

  
35 35
# This is used to test the ssynchronize decorator.
......
39 39
#: List for looping tests
40 40
ITERATIONS = range(8)
41 41

  
42

  
42 43
def _Repeat(fn):
43 44
  """Decorator for executing a function many times"""
44 45
  def wrapper(*args, **kwargs):
......
46 47
      fn(*args, **kwargs)
47 48
  return wrapper
48 49

  
50

  
49 51
class _ThreadedTestCase(unittest.TestCase):
50 52
  """Test class that supports adding/waiting on threads"""
51 53
  def setUp(self):
......
54 56

  
55 57
  def _addThread(self, *args, **kwargs):
56 58
    """Create and remember a new thread"""
57
    t = Thread(*args, **kwargs)
59
    t = threading.Thread(*args, **kwargs)
58 60
    self.threads.append(t)
59 61
    t.start()
60 62
    return t
......
147 149

  
148 150
  def testSharersCanCoexist(self):
149 151
    self.sl.acquire(shared=1)
150
    Thread(target=self._doItSharer).start()
152
    threading.Thread(target=self._doItSharer).start()
151 153
    self.assert_(self.done.get(True, 1))
152 154
    self.sl.release()
153 155

  
......
234 236
    self.assertEqual(self.done.get_nowait(), 'SHR')
235 237
    self.assertEqual(self.done.get_nowait(), 'EXC')
236 238

  
237
  def testNoNonBlocking(self):
238
    self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
239
    self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
240
    self.sl.acquire()
241
    self.sl.delete(blocking=0) # Fine, because the lock is already acquired
242

  
243 239
  def testDelete(self):
244 240
    self.sl.delete()
245 241
    self.assertRaises(errors.LockError, self.sl.acquire)
......
280 276
    self.assertEqual(self.done.get_nowait(), 'ERR')
281 277
    self.sl = locking.SharedLock()
282 278

  
279
  @_Repeat
280
  def testExclusiveAcquireTimeout(self):
281
    def _LockExclusive(wait):
282
      self.sl.acquire(shared=0)
283
      self.done.put("A: start sleep")
284
      time.sleep(wait)
285
      self.done.put("A: end sleep")
286
      self.sl.release()
287

  
288
    for shared in [0, 1]:
289
      # Start thread to hold lock for 20 ms
290
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
291

  
292
      # Wait up to 100 ms to get lock
293
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
294
      self.done.put("got 2nd")
295
      self.sl.release()
296

  
297
      self._waitThreads()
298

  
299
      self.assertEqual(self.done.get_nowait(), "A: start sleep")
300
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
301
      self.assertEqual(self.done.get_nowait(), "got 2nd")
302
      self.assertRaises(Queue.Empty, self.done.get_nowait)
303

  
304
  @_Repeat
305
  def testAcquireExpiringTimeout(self):
306
    def _AcquireWithTimeout(shared, timeout):
307
      if not self.sl.acquire(shared=shared, timeout=timeout):
308
        self.done.put("timeout")
309

  
310
    for shared in [0, 1]:
311
      # Lock exclusively
312
      self.sl.acquire()
313

  
314
      # Start shared acquires with timeout between 0 and 20 ms
315
      for i in xrange(11):
316
        self._addThread(target=_AcquireWithTimeout,
317
                        args=(shared, i * 2.0 / 1000.0))
318

  
319
      # Wait for threads to finish (makes sure the acquire timeout expires
320
      # before releasing the lock)
321
      self._waitThreads()
322

  
323
      # Release lock
324
      self.sl.release()
325

  
326
      for _ in xrange(11):
327
        self.assertEqual(self.done.get_nowait(), "timeout")
328

  
329
      self.assertRaises(Queue.Empty, self.done.get_nowait)
330

  
331
  @_Repeat
332
  def testSharedSkipExclusiveAcquires(self):
333
    # Tests whether shared acquires jump in front of exclusive acquires in the
334
    # queue.
335

  
336
    # Get exclusive lock while we fill the queue
337
    self.sl.acquire()
338

  
339
    def _Acquire(shared, name):
340
      if not self.sl.acquire(shared=shared):
341
        return
342

  
343
      self.done.put(name)
344
      self.sl.release()
345

  
346
    # Start shared acquires
347
    for _ in xrange(5):
348
      self._addThread(target=_Acquire, args=(1, "shared A"))
349

  
350
    # Start exclusive acquires
351
    for _ in xrange(3):
352
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
353

  
354
    # More shared acquires
355
    for _ in xrange(5):
356
      self._addThread(target=_Acquire, args=(1, "shared C"))
357

  
358
    # More exclusive acquires
359
    for _ in xrange(3):
360
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
361

  
362
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
363
    # together
364
    self.assertEqual(self.sl._count_pending(), 7)
365

  
366
    # Release exclusive lock and wait
367
    self.sl.release()
368

  
369
    self._waitThreads()
370

  
371
    # Check sequence
372
    for _ in xrange(10):
373
      # Shared locks aren't guaranteed to be notified in order, but they'll be
374
      # first
375
      self.assert_(self.done.get_nowait() in ("shared A", "shared C"))
376

  
377
    for _ in xrange(3):
378
      self.assertEqual(self.done.get_nowait(), "exclusive B")
379

  
380
    for _ in xrange(3):
381
      self.assertEqual(self.done.get_nowait(), "exclusive D")
382

  
383
    self.assertRaises(Queue.Empty, self.done.get_nowait)
384

  
385
  @_Repeat
386
  def testMixedAcquireTimeout(self):
387
    sync = threading.Condition()
388

  
389
    def _AcquireShared(ev):
390
      if not self.sl.acquire(shared=1, timeout=None):
391
        return
392

  
393
      self.done.put("shared")
394

  
395
      # Notify main thread
396
      ev.set()
397

  
398
      # Wait for notification
399
      sync.acquire()
400
      try:
401
        sync.wait()
402
      finally:
403
        sync.release()
404

  
405
      # Release lock
406
      self.sl.release()
407

  
408
    acquires = []
409
    for _ in xrange(3):
410
      ev = threading.Event()
411
      self._addThread(target=_AcquireShared, args=(ev, ))
412
      acquires.append(ev)
413

  
414
    # Wait for all acquires to finish
415
    for i in acquires:
416
      i.wait()
417

  
418
    self.assertEqual(self.sl._count_pending(), 0)
419

  
420
    # Try to get exclusive lock
421
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
422

  
423
    # Acquire exclusive without timeout
424
    exclsync = threading.Condition()
425
    exclev = threading.Event()
426

  
427
    def _AcquireExclusive():
428
      if not self.sl.acquire(shared=0):
429
        return
430

  
431
      self.done.put("exclusive")
432

  
433
      # Notify main thread
434
      exclev.set()
435

  
436
      exclsync.acquire()
437
      try:
438
        exclsync.wait()
439
      finally:
440
        exclsync.release()
441

  
442
      self.sl.release()
443

  
444
    self._addThread(target=_AcquireExclusive)
445

  
446
    # Try to get exclusive lock
447
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
448

  
449
    # Make all shared holders release their locks
450
    sync.acquire()
451
    try:
452
      sync.notifyAll()
453
    finally:
454
      sync.release()
455

  
456
    # Wait for exclusive acquire to succeed
457
    exclev.wait()
458

  
459
    self.assertEqual(self.sl._count_pending(), 0)
460

  
461
    # Try to get exclusive lock
462
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
463

  
464
    def _AcquireSharedSimple():
465
      if self.sl.acquire(shared=1, timeout=None):
466
        self.done.put("shared2")
467
        self.sl.release()
468

  
469
    for _ in xrange(10):
470
      self._addThread(target=_AcquireSharedSimple)
471

  
472
    # Tell exclusive lock to release
473
    exclsync.acquire()
474
    try:
475
      exclsync.notifyAll()
476
    finally:
477
      exclsync.release()
478

  
479
    # Wait for everything to finish
480
    self._waitThreads()
481

  
482
    self.assertEqual(self.sl._count_pending(), 0)
483

  
484
    # Check sequence
485
    for _ in xrange(3):
486
      self.assertEqual(self.done.get_nowait(), "shared")
487

  
488
    self.assertEqual(self.done.get_nowait(), "exclusive")
489

  
490
    for _ in xrange(10):
491
      self.assertEqual(self.done.get_nowait(), "shared2")
492

  
493
    self.assertRaises(Queue.Empty, self.done.get_nowait)
494

  
283 495

  
284 496
class TestSSynchronizedDecorator(_ThreadedTestCase):
285 497
  """Shared Lock Synchronized decorator test"""
......
307 519

  
308 520
  def testSharersCanCoexist(self):
309 521
    _decoratorlock.acquire(shared=1)
310
    Thread(target=self._doItSharer).start()
522
    threading.Thread(target=self._doItSharer).start()
311 523
    self.assert_(self.done.get(True, 1))
312 524
    _decoratorlock.release()
313 525

  
......
354 566
    self.resources = ['one', 'two', 'three']
355 567
    self.ls = locking.LockSet(members=self.resources)
356 568

  
357

  
358 569
  def testResources(self):
359 570
    self.assertEquals(self.ls._names(), set(self.resources))
360 571
    newls = locking.LockSet()
......
489 700
    # We haven't really acquired anything, so we cannot release
490 701
    self.assertRaises(AssertionError, self.ls.release)
491 702

  
492
  def _doLockSet(self, set, shared):
703
  def _doLockSet(self, names, shared):
493 704
    try:
494
      self.ls.acquire(set, shared=shared)
705
      self.ls.acquire(names, shared=shared)
495 706
      self.done.put('DONE')
496 707
      self.ls.release()
497 708
    except errors.LockError:
498 709
      self.done.put('ERR')
499 710

  
500
  def _doAddSet(self, set):
711
  def _doAddSet(self, names):
501 712
    try:
502
      self.ls.add(set, acquired=1)
713
      self.ls.add(names, acquired=1)
503 714
      self.done.put('DONE')
504 715
      self.ls.release()
505 716
    except errors.LockError:
506 717
      self.done.put('ERR')
507 718

  
508
  def _doRemoveSet(self, set):
509
    self.done.put(self.ls.remove(set))
719
  def _doRemoveSet(self, names):
720
    self.done.put(self.ls.remove(names))
510 721

  
511 722
  @_Repeat
512 723
  def testConcurrentSharedAcquire(self):
......
537 748
    self._addThread(target=self._doLockSet, args=('three', 0))
538 749
    self._waitThreads()
539 750
    self.assertEqual(self.done.get_nowait(), 'DONE')
751
    self.assertRaises(Queue.Empty, self.done.get_nowait)
540 752
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
541 753
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
542 754
    self._addThread(target=self._doLockSet, args=('one', 0))

Also available in: Unified diff