from ganeti import locking
from ganeti import errors
+import testutils
+
# This is used to test the ssynchronize decorator.
# Since it's passed as input to a decorator it must be declared as a global.
return wrapper
+def SafeSleep(duration):
+ start = time.time()
+ while True:
+ delay = start + duration - time.time()
+ if delay <= 0.0:
+ break
+ time.sleep(delay)
+
+
class _ThreadedTestCase(unittest.TestCase):
"""Test class that supports adding/waiting on threads"""
def setUp(self):
self.sl.acquire()
# Start shared acquires with timeout between 0 and 20 ms
- for i in xrange(11):
+ for i in range(11):
self._addThread(target=_AcquireWithTimeout,
args=(shared, i * 2.0 / 1000.0))
# Release lock
self.sl.release()
- for _ in xrange(11):
+ for _ in range(11):
self.assertEqual(self.done.get_nowait(), "timeout")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertEqual(shrcnt3, 0)
self.assertEqual(shrcnt3, 0)
- for _ in xrange(3):
+ for _ in range(3):
self.assertEqual(self.done.get_nowait(), "exclusive 1")
- for _ in xrange(3):
+ for _ in range(3):
self.assertEqual(self.done.get_nowait(), "exclusive 2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testMixedAcquireTimeout(self):
- sync = threading.Condition()
+ sync = threading.Event()
def _AcquireShared(ev):
if not self.sl.acquire(shared=1, timeout=None):
# Notify main thread
ev.set()
- # Wait for notification
- sync.acquire()
- try:
- sync.wait()
- finally:
- sync.release()
+ # Wait for notification from main thread
+ sync.wait()
# Release lock
self.sl.release()
acquires = []
- for _ in xrange(3):
+ for _ in range(3):
ev = threading.Event()
self._addThread(target=_AcquireShared, args=(ev, ))
acquires.append(ev)
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Acquire exclusive without timeout
- exclsync = threading.Condition()
+ exclsync = threading.Event()
exclev = threading.Event()
def _AcquireExclusive():
# Notify main thread
exclev.set()
- exclsync.acquire()
- try:
- exclsync.wait()
- finally:
- exclsync.release()
+ # Wait for notification from main thread
+ exclsync.wait()
self.sl.release()
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Make all shared holders release their locks
- sync.acquire()
- try:
- sync.notifyAll()
- finally:
- sync.release()
+ sync.set()
# Wait for exclusive acquire to succeed
exclev.wait()
self.done.put("shared2")
self.sl.release()
- for _ in xrange(10):
+ for _ in range(10):
self._addThread(target=_AcquireSharedSimple)
# Tell exclusive lock to release
- exclsync.acquire()
- try:
- exclsync.notifyAll()
- finally:
- exclsync.release()
+ exclsync.set()
# Wait for everything to finish
self._waitThreads()
self.assertEqual(self.sl._count_pending(), 0)
# Check sequence
- for _ in xrange(3):
+ for _ in range(3):
self.assertEqual(self.done.get_nowait(), "shared")
self.assertEqual(self.done.get_nowait(), "exclusive")
- for _ in xrange(10):
+ for _ in range(10):
self.assertEqual(self.done.get_nowait(), "shared2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.failUnlessEqual(self.done.get_nowait(), 'DONE')
@_Repeat
+ def testSimpleAcquireTimeoutExpiring(self):
+ names = sorted(self.ls._names())
+ self.assert_(len(names) >= 3)
+
+ # Get name of first lock
+ first = names[0]
+
+ # Get name of last lock
+ last = names.pop()
+
+ checks = [
+ # Block first and try to lock it again
+ (first, first),
+
+ # Block last and try to lock all locks
+ (None, first),
+
+ # Block last and try to lock it again
+ (last, last),
+ ]
+
+ for (wanted, block) in checks:
+ # Lock in exclusive mode
+ self.assert_(self.ls.acquire(block, shared=0))
+
+ def _AcquireOne():
+ # Try to get the same lock again with a timeout (should never succeed)
+ acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
+ if acquired:
+ self.done.put("acquired")
+ self.ls.release()
+ else:
+ self.assert_(acquired is None)
+ self.assert_(not self.ls._list_owned())
+ self.assert_(not self.ls._is_owned())
+ self.done.put("not acquired")
+
+ self._addThread(target=_AcquireOne)
+
+ # Wait for timeout in thread to expire
+ self._waitThreads()
+
+ # Release exclusive lock again
+ self.ls.release()
+
+ self.assertEqual(self.done.get_nowait(), "not acquired")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ @_Repeat
+ def testDelayedAndExpiringLockAcquire(self):
+ self._setUpLS()
+ self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
+
+ for expire in (False, True):
+ names = sorted(self.ls._names())
+ self.assertEqual(len(names), 8)
+
+ lock_ev = dict([(i, threading.Event()) for i in names])
+
+ # Lock all in exclusive mode
+ self.assert_(self.ls.acquire(names, shared=0))
+
+ if expire:
+ # We'll wait at least 300ms per lock
+ lockwait = len(names) * [0.3]
+
+ # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
+ # this gives us up to 2.4s to fail.
+ lockall_timeout = 0.4
+ else:
+ # This should finish rather quickly
+ lockwait = None
+ lockall_timeout = len(names) * 5.0
+
+ def _LockAll():
+ def acquire_notification(name):
+ if not expire:
+ self.done.put("getting %s" % name)
+
+ # Kick next lock
+ lock_ev[name].set()
+
+ if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
+ test_notify=acquire_notification):
+ self.done.put("got all")
+ self.ls.release()
+ else:
+ self.done.put("timeout on all")
+
+ # Notify all locks
+ for ev in lock_ev.values():
+ ev.set()
+
+ t = self._addThread(target=_LockAll)
+
+ for idx, name in enumerate(names):
+ # Wait for actual acquire on this lock to start
+ lock_ev[name].wait(10.0)
+
+ if expire and t.isAlive():
+ # Wait some time after getting the notification to make sure the lock
+ # acquire will expire
+ SafeSleep(lockwait[idx])
+
+ self.ls.release(names=name)
+
+ self.assert_(not self.ls._list_owned())
+
+ self._waitThreads()
+
+ if expire:
+ # Not checking which locks were actually acquired. Doing so would be
+ # too timing-dependant.
+ self.assertEqual(self.done.get_nowait(), "timeout on all")
+ else:
+ for i in names:
+ self.assertEqual(self.done.get_nowait(), "getting %s" % i)
+ self.assertEqual(self.done.get_nowait(), "got all")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ @_Repeat
def testConcurrentRemove(self):
self.ls.add('four')
self.ls.acquire(['one', 'two', 'four'])
if __name__ == '__main__':
- unittest.main()
- #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
- #unittest.TextTestRunner(verbosity=2).run(suite)
+ testutils.GanetiTestProgram()