Revision 26082b7e test/ganeti.locking_unittest.py
b/test/ganeti.locking_unittest.py | ||
---|---|---|
433 | 433 |
self.assertRaises(errors.LockError, self.sl.delete) |
434 | 434 |
|
435 | 435 |
def testDeleteTimeout(self): |
436 |
self.sl.delete(timeout=60) |
|
436 |
self.assertTrue(self.sl.delete(timeout=60)) |
|
437 |
|
|
438 |
def testDeleteTimeoutFail(self): |
|
439 |
ready = threading.Event() |
|
440 |
finish = threading.Event() |
|
441 |
|
|
442 |
def fn(): |
|
443 |
self.sl.acquire(shared=0) |
|
444 |
ready.set() |
|
445 |
|
|
446 |
finish.wait() |
|
447 |
self.sl.release() |
|
448 |
|
|
449 |
self._addThread(target=fn) |
|
450 |
ready.wait() |
|
451 |
|
|
452 |
# Test if deleting a lock owned in exclusive mode by another thread fails |
|
453 |
# to delete when a timeout is used |
|
454 |
self.assertFalse(self.sl.delete(timeout=0.02)) |
|
455 |
|
|
456 |
finish.set() |
|
457 |
self._waitThreads() |
|
458 |
|
|
459 |
self.assertTrue(self.sl.delete()) |
|
460 |
self.assertRaises(errors.LockError, self.sl.acquire) |
|
437 | 461 |
|
438 | 462 |
def testNoDeleteIfSharer(self): |
439 | 463 |
self.sl.acquire(shared=1) |
Also available in: Unified diff