#!/usr/bin/python
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import time
import Queue
import threading
+import random
from ganeti import locking
from ganeti import errors
+from ganeti import utils
+
+import testutils
# This is used to test the ssynchronize decorator.
# Since it's passed as input to a decorator it must be declared as a global.
-_decoratorlock = locking.SharedLock()
+_decoratorlock = locking.SharedLock("decorator lock")
#: List for looping tests
ITERATIONS = range(8)
return wrapper
+def SafeSleep(duration):
+ start = time.time()
+ while True:
+ delay = start + duration - time.time()
+ if delay <= 0.0:
+ break
+ time.sleep(delay)
+
+
class _ThreadedTestCase(unittest.TestCase):
"""Test class that supports adding/waiting on threads"""
def setUp(self):
self.cond = cls(self.lock)
def _testAcquireRelease(self):
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.assert_(self.cond._is_owned())
self.cond.release()
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.assertEqual(self.done.get(True, 1), "NN")
self.assert_(self.cond._is_owned())
self.cond.release()
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
+
+
+class TestSingleNotifyPipeCondition(_ConditionTestCase):
+ """SingleNotifyPipeCondition tests"""
+
+ def setUp(self):
+ _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
+
+ def testAcquireRelease(self):
+ self._testAcquireRelease()
+
+ def testNotification(self):
+ self._testNotification()
+
+ def testWaitReuse(self):
+ self.cond.acquire()
+ self.cond.wait(0)
+ self.cond.wait(0.1)
+ self.cond.release()
+
+ def testNoNotifyReuse(self):
+ self.cond.acquire()
+ self.cond.notifyAll()
+ self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.notifyAll)
+ self.cond.release()
class TestPipeCondition(_ConditionTestCase):
- """_PipeCondition tests"""
+ """PipeCondition tests"""
def setUp(self):
- _ConditionTestCase.setUp(self, locking._PipeCondition)
+ _ConditionTestCase.setUp(self, locking.PipeCondition)
def testAcquireRelease(self):
self._testAcquireRelease()
self._testNotification()
def _TestWait(self, fn):
- self._addThread(target=fn)
- self._addThread(target=fn)
- self._addThread(target=fn)
+ threads = [
+ self._addThread(target=fn),
+ self._addThread(target=fn),
+ self._addThread(target=fn),
+ ]
# Wait for threads to be waiting
- self.assertEqual(self.done.get(True, 1), "A")
- self.assertEqual(self.done.get(True, 1), "A")
- self.assertEqual(self.done.get(True, 1), "A")
+ for _ in threads:
+ self.assertEqual(self.done.get(True, 1), "A")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.acquire()
- self.assertEqual(self.cond._nwaiters, 3)
- # This new thread can"t acquire the lock, and thus call wait, before we
+ self.assertEqual(len(self.cond._waiters), 3)
+ self.assertEqual(self.cond._waiters, set(threads))
+ # This new thread can't acquire the lock, and thus call wait, before we
# release it
self._addThread(target=fn)
self.cond.notifyAll()
self.assertRaises(Queue.Empty, self.done.get_nowait)
-class TestSingleActionPipeCondition(unittest.TestCase):
- """_SingleActionPipeCondition tests"""
-
- def setUp(self):
- self.cond = locking._SingleActionPipeCondition()
-
- def testInitialization(self):
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
- self.assertEqual(self.cond._nwaiters, 0)
-
- def testUsageCount(self):
- self.cond.StartWaiting()
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
- self.assertEqual(self.cond._nwaiters, 1)
-
- # use again
- self.cond.StartWaiting()
- self.assertEqual(self.cond._nwaiters, 2)
-
- # there is more than one user
- self.assert_(not self.cond.DoneWaiting())
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
- self.assertEqual(self.cond._nwaiters, 1)
-
- self.assert_(self.cond.DoneWaiting())
- self.assertEqual(self.cond._nwaiters, 0)
- self.assert_(self.cond._read_fd is None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is None)
-
- def testNotify(self):
- wait1 = self.cond.StartWaiting()
- wait2 = self.cond.StartWaiting()
-
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
-
- self.cond.notifyAll()
-
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is not None)
-
- self.assert_(not self.cond.DoneWaiting())
-
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is not None)
-
- self.assert_(self.cond.DoneWaiting())
-
- self.assert_(self.cond._read_fd is None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is None)
-
- def testReusage(self):
- self.cond.StartWaiting()
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
-
- self.assert_(self.cond.DoneWaiting())
-
- self.assertRaises(RuntimeError, self.cond.StartWaiting)
- self.assert_(self.cond._read_fd is None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is None)
-
- def testNotifyTwice(self):
- self.cond.notifyAll()
- self.assertRaises(RuntimeError, self.cond.notifyAll)
-
-
class TestSharedLock(_ThreadedTestCase):
"""SharedLock tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock("TestSharedLock")
def testSequenceAndOwnership(self):
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
- self.assert_(not self.sl._is_owned(shared=0))
+ self.assertFalse(self.sl._is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
self.sl.acquire()
self.assert_(self.sl._is_owned())
- self.assert_(not self.sl._is_owned(shared=1))
+ self.assertFalse(self.sl._is_owned(shared=1))
self.assert_(self.sl._is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
- self.assert_(not self.sl._is_owned(shared=0))
+ self.assertFalse(self.sl._is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'DEL')
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testExclusiveBlocksSharer(self):
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'DEL')
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testWaitingExclusiveBlocksSharer(self):
# The threads who were pending return ERR
for _ in range(4):
self.assertEqual(self.done.get_nowait(), 'ERR')
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testDeletePendingDeleteExclusiveSharers(self):
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testExclusiveAcquireTimeout(self):
- def _LockExclusive(wait):
- self.sl.acquire(shared=0)
- self.done.put("A: start sleep")
- time.sleep(wait)
- self.done.put("A: end sleep")
- self.sl.release()
-
for shared in [0, 1]:
- # Start thread to hold lock for 20 ms
- self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
+ on_queue = threading.Event()
+ release_exclusive = threading.Event()
+
+ def _LockExclusive():
+ self.sl.acquire(shared=0, test_notify=on_queue.set)
+ self.done.put("A: start wait")
+ release_exclusive.wait()
+ self.done.put("A: end wait")
+ self.sl.release()
- # Wait for sleep to begin
- self.assertEqual(self.done.get(), "A: start sleep")
+ # Start thread to hold lock in exclusive mode
+ self._addThread(target=_LockExclusive)
+
+ # Wait for wait to begin
+ self.assertEqual(self.done.get(timeout=60), "A: start wait")
+
+ # Wait up to 60s to get lock, but release exclusive lock as soon as we're
+ # on the queue
+ self.failUnless(self.sl.acquire(shared=shared, timeout=60,
+ test_notify=release_exclusive.set))
- # Wait up to 100 ms to get lock
- self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
self.done.put("got 2nd")
self.sl.release()
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), "A: end sleep")
+ self.assertEqual(self.done.get_nowait(), "A: end wait")
self.assertEqual(self.done.get_nowait(), "got 2nd")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.acquire()
# Start shared acquires with timeout between 0 and 20 ms
- for i in xrange(11):
+ for i in range(11):
self._addThread(target=_AcquireWithTimeout,
args=(shared, i * 2.0 / 1000.0))
# Release lock
self.sl.release()
- for _ in xrange(11):
+ for _ in range(11):
self.assertEqual(self.done.get_nowait(), "timeout")
self.assertRaises(Queue.Empty, self.done.get_nowait)
# Tests whether shared acquires jump in front of exclusive acquires in the
# queue.
- # Get exclusive lock while we fill the queue
- self.sl.acquire()
+ def _Acquire(shared, name, notify_ev, wait_ev):
+ if notify_ev:
+ notify_fn = notify_ev.set
+ else:
+ notify_fn = None
- def _Acquire(shared, name):
- if not self.sl.acquire(shared=shared):
+ if wait_ev:
+ wait_ev.wait()
+
+ if not self.sl.acquire(shared=shared, test_notify=notify_fn):
return
self.done.put(name)
self.sl.release()
- # Start shared acquires
- for _ in xrange(5):
- self._addThread(target=_Acquire, args=(1, "shared A"))
+ # Get exclusive lock while we fill the queue
+ self.sl.acquire()
+
+ shrcnt1 = 5
+ shrcnt2 = 7
+ shrcnt3 = 9
+ shrcnt4 = 2
+
+ # Add acquires using threading.Event for synchronization. They'll be
+ # acquired exactly in the order defined in this list.
+ acquires = (shrcnt1 * [(1, "shared 1")] +
+ 3 * [(0, "exclusive 1")] +
+ shrcnt2 * [(1, "shared 2")] +
+ shrcnt3 * [(1, "shared 3")] +
+ shrcnt4 * [(1, "shared 4")] +
+ 3 * [(0, "exclusive 2")])
- # Start exclusive acquires
- for _ in xrange(3):
- self._addThread(target=_Acquire, args=(0, "exclusive B"))
+ ev_cur = None
+ ev_prev = None
- # More shared acquires
- for _ in xrange(5):
- self._addThread(target=_Acquire, args=(1, "shared C"))
+ for args in acquires:
+ ev_cur = threading.Event()
+ self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
+ ev_prev = ev_cur
- # More exclusive acquires
- for _ in xrange(3):
- self._addThread(target=_Acquire, args=(0, "exclusive D"))
+ # Wait for last acquire to start
+ ev_prev.wait()
# Expect 6 pending exclusive acquires and 1 for all shared acquires
- # together. There's no way to wait for SharedLock.acquire to start
- # its work. Hence the timeout of 2 seconds.
- pending = 0
- end_time = time.time() + 2.0
- while time.time() < end_time:
- pending = self.sl._count_pending()
- self.assert_(pending >= 0 and pending <= 7)
- if pending == 7:
- break
- time.sleep(0.05)
- self.assertEqual(pending, 7)
+ # together
+ self.assertEqual(self.sl._count_pending(), 7)
# Release exclusive lock and wait
self.sl.release()
self._waitThreads()
# Check sequence
- shr_a = 0
- shr_c = 0
- for _ in xrange(10):
+ for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
# Shared locks aren't guaranteed to be notified in order, but they'll be
# first
tmp = self.done.get_nowait()
- if tmp == "shared A":
- shr_a += 1
- elif tmp == "shared C":
- shr_c += 1
- self.assertEqual(shr_a, 5)
- self.assertEqual(shr_c, 5)
+ if tmp == "shared 1":
+ shrcnt1 -= 1
+ elif tmp == "shared 2":
+ shrcnt2 -= 1
+ elif tmp == "shared 3":
+ shrcnt3 -= 1
+ elif tmp == "shared 4":
+ shrcnt4 -= 1
+ self.assertEqual(shrcnt1, 0)
+ self.assertEqual(shrcnt2, 0)
+ self.assertEqual(shrcnt3, 0)
+ self.assertEqual(shrcnt3, 0)
- for _ in xrange(3):
- self.assertEqual(self.done.get_nowait(), "exclusive B")
+ for _ in range(3):
+ self.assertEqual(self.done.get_nowait(), "exclusive 1")
- for _ in xrange(3):
- self.assertEqual(self.done.get_nowait(), "exclusive D")
+ for _ in range(3):
+ self.assertEqual(self.done.get_nowait(), "exclusive 2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testMixedAcquireTimeout(self):
- sync = threading.Condition()
+ sync = threading.Event()
def _AcquireShared(ev):
if not self.sl.acquire(shared=1, timeout=None):
# Notify main thread
ev.set()
- # Wait for notification
- sync.acquire()
- try:
- sync.wait()
- finally:
- sync.release()
+ # Wait for notification from main thread
+ sync.wait()
# Release lock
self.sl.release()
acquires = []
- for _ in xrange(3):
+ for _ in range(3):
ev = threading.Event()
self._addThread(target=_AcquireShared, args=(ev, ))
acquires.append(ev)
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Acquire exclusive without timeout
- exclsync = threading.Condition()
+ exclsync = threading.Event()
exclev = threading.Event()
def _AcquireExclusive():
# Notify main thread
exclev.set()
- exclsync.acquire()
- try:
- exclsync.wait()
- finally:
- exclsync.release()
+ # Wait for notification from main thread
+ exclsync.wait()
self.sl.release()
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Make all shared holders release their locks
- sync.acquire()
- try:
- sync.notifyAll()
- finally:
- sync.release()
+ sync.set()
# Wait for exclusive acquire to succeed
exclev.wait()
self.done.put("shared2")
self.sl.release()
- for _ in xrange(10):
+ for _ in range(10):
self._addThread(target=_AcquireSharedSimple)
# Tell exclusive lock to release
- exclsync.acquire()
- try:
- exclsync.notifyAll()
- finally:
- exclsync.release()
+ exclsync.set()
# Wait for everything to finish
self._waitThreads()
self.assertEqual(self.sl._count_pending(), 0)
# Check sequence
- for _ in xrange(3):
+ for _ in range(3):
self.assertEqual(self.done.get_nowait(), "shared")
self.assertEqual(self.done.get_nowait(), "exclusive")
- for _ in xrange(10):
+ for _ in range(10):
self.assertEqual(self.done.get_nowait(), "shared2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
+class TestSharedLockInCondition(_ThreadedTestCase):
+ """SharedLock as a condition lock tests"""
+
+ def setUp(self):
+ _ThreadedTestCase.setUp(self)
+ self.sl = locking.SharedLock("TestSharedLockInCondition")
+ self.setCondition()
+
+ def setCondition(self):
+ self.cond = threading.Condition(self.sl)
+
+ def testKeepMode(self):
+ self.cond.acquire(shared=1)
+ self.assert_(self.sl._is_owned(shared=1))
+ self.cond.wait(0)
+ self.assert_(self.sl._is_owned(shared=1))
+ self.cond.release()
+ self.cond.acquire(shared=0)
+ self.assert_(self.sl._is_owned(shared=0))
+ self.cond.wait(0)
+ self.assert_(self.sl._is_owned(shared=0))
+ self.cond.release()
+
+
+class TestSharedLockInPipeCondition(TestSharedLockInCondition):
+ """SharedLock as a pipe condition lock tests"""
+
+ def setCondition(self):
+ self.cond = locking.PipeCondition(self.sl)
+
+
class TestSSynchronizedDecorator(_ThreadedTestCase):
"""Shared Lock Synchronized decorator test"""
def testDecoratedFunctions(self):
self._doItExclusive()
- self.assert_(not _decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock._is_owned())
self._doItSharer()
- self.assert_(not _decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock._is_owned())
def testSharersCanCoexist(self):
_decoratorlock.acquire(shared=1)
def _setUpLS(self):
"""Helper to (re)initialize the lock set"""
self.resources = ['one', 'two', 'three']
- self.ls = locking.LockSet(members=self.resources)
+ self.ls = locking.LockSet(self.resources, "TestLockSet")
def testResources(self):
self.assertEquals(self.ls._names(), set(self.resources))
- newls = locking.LockSet()
+ newls = locking.LockSet([], "TestLockSet.testResources")
self.assertEquals(newls._names(), set())
def testAcquireRelease(self):
self.failUnlessEqual(self.done.get_nowait(), 'DONE')
@_Repeat
+ def testSimpleAcquireTimeoutExpiring(self):
+ names = sorted(self.ls._names())
+ self.assert_(len(names) >= 3)
+
+ # Get name of first lock
+ first = names[0]
+
+ # Get name of last lock
+ last = names.pop()
+
+ checks = [
+ # Block first and try to lock it again
+ (first, first),
+
+ # Block last and try to lock all locks
+ (None, first),
+
+ # Block last and try to lock it again
+ (last, last),
+ ]
+
+ for (wanted, block) in checks:
+ # Lock in exclusive mode
+ self.assert_(self.ls.acquire(block, shared=0))
+
+ def _AcquireOne():
+ # Try to get the same lock again with a timeout (should never succeed)
+ acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
+ if acquired:
+ self.done.put("acquired")
+ self.ls.release()
+ else:
+ self.assert_(acquired is None)
+ self.assertFalse(self.ls._list_owned())
+ self.assertFalse(self.ls._is_owned())
+ self.done.put("not acquired")
+
+ self._addThread(target=_AcquireOne)
+
+ # Wait for timeout in thread to expire
+ self._waitThreads()
+
+ # Release exclusive lock again
+ self.ls.release()
+
+ self.assertEqual(self.done.get_nowait(), "not acquired")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ @_Repeat
+ def testDelayedAndExpiringLockAcquire(self):
+ self._setUpLS()
+ self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
+
+ for expire in (False, True):
+ names = sorted(self.ls._names())
+ self.assertEqual(len(names), 8)
+
+ lock_ev = dict([(i, threading.Event()) for i in names])
+
+ # Lock all in exclusive mode
+ self.assert_(self.ls.acquire(names, shared=0))
+
+ if expire:
+ # We'll wait at least 300ms per lock
+ lockwait = len(names) * [0.3]
+
+ # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
+ # this gives us up to 2.4s to fail.
+ lockall_timeout = 0.4
+ else:
+ # This should finish rather quickly
+ lockwait = None
+ lockall_timeout = len(names) * 5.0
+
+ def _LockAll():
+ def acquire_notification(name):
+ if not expire:
+ self.done.put("getting %s" % name)
+
+ # Kick next lock
+ lock_ev[name].set()
+
+ if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
+ test_notify=acquire_notification):
+ self.done.put("got all")
+ self.ls.release()
+ else:
+ self.done.put("timeout on all")
+
+ # Notify all locks
+ for ev in lock_ev.values():
+ ev.set()
+
+ t = self._addThread(target=_LockAll)
+
+ for idx, name in enumerate(names):
+ # Wait for actual acquire on this lock to start
+ lock_ev[name].wait(10.0)
+
+ if expire and t.isAlive():
+ # Wait some time after getting the notification to make sure the lock
+ # acquire will expire
+ SafeSleep(lockwait[idx])
+
+ self.ls.release(names=name)
+
+ self.assertFalse(self.ls._list_owned())
+
+ self._waitThreads()
+
+ if expire:
+ # Not checking which locks were actually acquired. Doing so would be
+ # too timing-dependant.
+ self.assertEqual(self.done.get_nowait(), "timeout on all")
+ else:
+ for i in names:
+ self.assertEqual(self.done.get_nowait(), "getting %s" % i)
+ self.assertEqual(self.done.get_nowait(), "got all")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ @_Repeat
def testConcurrentRemove(self):
self.ls.add('four')
self.ls.acquire(['one', 'two', 'four'])
def testInitAndResources(self):
locking.GanetiLockManager._instance = None
- self.GL = locking.GanetiLockManager()
+ self.GL = locking.GanetiLockManager([], [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
locking.GanetiLockManager._instance = None
- self.GL = locking.GanetiLockManager(nodes=self.nodes)
+ self.GL = locking.GanetiLockManager(self.nodes, [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
locking.GanetiLockManager._instance = None
- self.GL = locking.GanetiLockManager(instances=self.instances)
+ self.GL = locking.GanetiLockManager([], self.instances)
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
+class TestLockMonitor(_ThreadedTestCase):
+ def setUp(self):
+ _ThreadedTestCase.setUp(self)
+ self.lm = locking.LockMonitor()
+
+ def testSingleThread(self):
+ locks = []
+
+ for i in range(100):
+ name = "TestLock%s" % i
+ locks.append(locking.SharedLock(name, monitor=self.lm))
+
+ self.assertEqual(len(self.lm._locks), len(locks))
+
+ self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
+ 100)
+
+ # Delete all locks
+ del locks[:]
+
+ # The garbage collector might needs some time
+ def _CheckLocks():
+ if self.lm._locks:
+ raise utils.RetryAgain()
+
+ utils.Retry(_CheckLocks, 0.1, 30.0)
+
+ self.assertFalse(self.lm._locks)
+
+ def testMultiThread(self):
+ locks = []
+
+ def _CreateLock(prev, next, name):
+ prev.wait()
+ locks.append(locking.SharedLock(name, monitor=self.lm))
+ if next:
+ next.set()
+
+ expnames = []
+
+ first = threading.Event()
+ prev = first
+
+ # Use a deterministic random generator
+ for i in random.Random(4263).sample(range(100), 33):
+ name = "MtTestLock%s" % i
+ expnames.append(name)
+
+ ev = threading.Event()
+ self._addThread(target=_CreateLock, args=(prev, ev, name))
+ prev = ev
+
+ # Add locks
+ first.set()
+ self._waitThreads()
+
+ # Check order in which locks were added
+ self.assertEqual([i.name for i in locks], expnames)
+
+ # Sync queries are not supported
+ self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
+
+ # Check query result
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
+ False),
+ [[name, None, None, []]
+ for name in utils.NiceSort(expnames)])
+
+ # Test exclusive acquire
+ for tlock in locks[::4]:
+ tlock.acquire(shared=0)
+ try:
+ def _GetExpResult(name):
+ if tlock.name == name:
+ return [name, "exclusive", [threading.currentThread().getName()],
+ []]
+ return [name, None, None, []]
+
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
+ "pending"], False),
+ [_GetExpResult(name)
+ for name in utils.NiceSort(expnames)])
+ finally:
+ tlock.release()
+
+ # Test shared acquire
+ def _Acquire(lock, shared, ev, notify):
+ lock.acquire(shared=shared)
+ try:
+ notify.set()
+ ev.wait()
+ finally:
+ lock.release()
+
+ for tlock1 in locks[::11]:
+ for tlock2 in locks[::-15]:
+ if tlock2 == tlock1:
+ # Avoid deadlocks
+ continue
+
+ for tlock3 in locks[::10]:
+ if tlock3 in (tlock2, tlock1):
+ # Avoid deadlocks
+ continue
+
+ releaseev = threading.Event()
+
+ # Acquire locks
+ acquireev = []
+ tthreads1 = []
+ for i in range(3):
+ ev = threading.Event()
+ tthreads1.append(self._addThread(target=_Acquire,
+ args=(tlock1, 1, releaseev, ev)))
+ acquireev.append(ev)
+
+ ev = threading.Event()
+ tthread2 = self._addThread(target=_Acquire,
+ args=(tlock2, 1, releaseev, ev))
+ acquireev.append(ev)
+
+ ev = threading.Event()
+ tthread3 = self._addThread(target=_Acquire,
+ args=(tlock3, 0, releaseev, ev))
+ acquireev.append(ev)
+
+ # Wait for all locks to be acquired
+ for i in acquireev:
+ i.wait()
+
+ # Check query result
+ for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
+ "owner"], False):
+ if name == tlock1.name:
+ self.assertEqual(mode, "shared")
+ self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
+ continue
+
+ if name == tlock2.name:
+ self.assertEqual(mode, "shared")
+ self.assertEqual(owner, [tthread2.getName()])
+ continue
+
+ if name == tlock3.name:
+ self.assertEqual(mode, "exclusive")
+ self.assertEqual(owner, [tthread3.getName()])
+ continue
+
+ self.assert_(name in expnames)
+ self.assert_(mode is None)
+ self.assert_(owner is None)
+
+ # Release locks again
+ releaseev.set()
+
+ self._waitThreads()
+
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
+ [[name, None, None]
+ for name in utils.NiceSort(expnames)])
+
+ def testDelete(self):
+ lock = locking.SharedLock("TestLock", monitor=self.lm)
+
+ self.assertEqual(len(self.lm._locks), 1)
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
+ [[lock.name, None, None]])
+
+ lock.delete()
+
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
+ [[lock.name, "deleted", None]])
+ self.assertEqual(len(self.lm._locks), 1)
+
+ def testPending(self):
+ def _Acquire(lock, shared, prev, next):
+ prev.wait()
+
+ lock.acquire(shared=shared, test_notify=next.set)
+ try:
+ pass
+ finally:
+ lock.release()
+
+ lock = locking.SharedLock("ExcLock", monitor=self.lm)
+
+ for shared in [0, 1]:
+ lock.acquire()
+ try:
+ self.assertEqual(len(self.lm._locks), 1)
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
+ [[lock.name, "exclusive",
+ [threading.currentThread().getName()]]])
+
+ threads = []
+
+ first = threading.Event()
+ prev = first
+
+ for i in range(5):
+ ev = threading.Event()
+ threads.append(self._addThread(target=_Acquire,
+ args=(lock, shared, prev, ev)))
+ prev = ev
+
+ # Start acquires
+ first.set()
+
+ # Wait for last acquire to start waiting
+ prev.wait()
+
+ # NOTE: This works only because QueryLocks will acquire the
+ # lock-internal lock again and won't be able to get the information
+ # until it has the lock. By then the acquire should be registered in
+ # SharedLock.__pending (otherwise it's a bug).
+
+ # All acquires are waiting now
+ if shared:
+ pending = [("shared", sorted([t.getName() for t in threads]))]
+ else:
+ pending = [("exclusive", [t.getName()]) for t in threads]
+
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
+ "pending"], False),
+ [[lock.name, "exclusive",
+ [threading.currentThread().getName()],
+ pending]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+ finally:
+ lock.release()
+
+ self._waitThreads()
+
+ # No pending acquires
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
+ False),
+ [[lock.name, None, None, []]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+
+
if __name__ == '__main__':
- unittest.main()
- #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
- #unittest.TextTestRunner(verbosity=2).run(suite)
+ testutils.GanetiTestProgram()