#!/usr/bin/python
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 0.0510-1301, USA.
+# 02110-1301, USA.
"""Script for unittesting the locking module"""
import time
import Queue
import threading
+import random
+import gc
+import itertools
+from ganeti import constants
from ganeti import locking
from ganeti import errors
+from ganeti import utils
+from ganeti import compat
+from ganeti import objects
+from ganeti import query
+
+import testutils
# This is used to test the ssynchronize decorator.
# Since it's passed as input to a decorator it must be declared as a global.
-_decoratorlock = locking.SharedLock()
+_decoratorlock = locking.SharedLock("decorator lock")
#: List for looping tests
ITERATIONS = range(8)
return wrapper
+def SafeSleep(duration):
+ start = time.time()
+ while True:
+ delay = start + duration - time.time()
+ if delay <= 0.0:
+ break
+ time.sleep(delay)
+
+
class _ThreadedTestCase(unittest.TestCase):
"""Test class that supports adding/waiting on threads"""
def setUp(self):
unittest.TestCase.setUp(self)
+ self.done = Queue.Queue(0)
self.threads = []
def _addThread(self, *args, **kwargs):
self.threads = []
-class TestPipeCondition(_ThreadedTestCase):
- """_PipeCondition tests"""
+class _ConditionTestCase(_ThreadedTestCase):
+ """Common test case for conditions"""
- def setUp(self):
+ def setUp(self, cls):
_ThreadedTestCase.setUp(self)
self.lock = threading.Lock()
- self.cond = locking._PipeCondition(self.lock)
- self.done = Queue.Queue(0)
+ self.cond = cls(self.lock)
- def testAcquireRelease(self):
- self.assert_(not self.cond._is_owned())
- self.assertRaises(RuntimeError, self.cond.wait)
+ def _testAcquireRelease(self):
+ self.assertFalse(self.cond._is_owned())
+ self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.acquire()
self.assert_(self.cond._is_owned())
self.cond.release()
- self.assert_(not self.cond._is_owned())
- self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertFalse(self.cond._is_owned())
+ self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
- def testNotification(self):
+ def _testNotification(self):
def _NotifyAll():
+ self.done.put("NE")
self.cond.acquire()
+ self.done.put("NA")
self.cond.notifyAll()
+ self.done.put("NN")
self.cond.release()
self.cond.acquire()
self._addThread(target=_NotifyAll)
- self.cond.wait()
+ self.assertEqual(self.done.get(True, 1), "NE")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.cond.wait(None)
+ self.assertEqual(self.done.get(True, 1), "NA")
+ self.assertEqual(self.done.get(True, 1), "NN")
self.assert_(self.cond._is_owned())
self.cond.release()
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
+
+
+class TestSingleNotifyPipeCondition(_ConditionTestCase):
+ """SingleNotifyPipeCondition tests"""
+
+ def setUp(self):
+ _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
+
+ def testAcquireRelease(self):
+ self._testAcquireRelease()
+
+ def testNotification(self):
+ self._testNotification()
+
+ def testWaitReuse(self):
+ self.cond.acquire()
+ self.cond.wait(0)
+ self.cond.wait(0.1)
+ self.cond.release()
+
+ def testNoNotifyReuse(self):
+ self.cond.acquire()
+ self.cond.notifyAll()
+ self.assertRaises(RuntimeError, self.cond.wait, None)
+ self.assertRaises(RuntimeError, self.cond.notifyAll)
+ self.cond.release()
+
+
+class TestPipeCondition(_ConditionTestCase):
+ """PipeCondition tests"""
+
+ def setUp(self):
+ _ConditionTestCase.setUp(self, locking.PipeCondition)
+
+ def testAcquireRelease(self):
+ self._testAcquireRelease()
+
+ def testNotification(self):
+ self._testNotification()
def _TestWait(self, fn):
- self._addThread(target=fn)
- self._addThread(target=fn)
- self._addThread(target=fn)
+ threads = [
+ self._addThread(target=fn),
+ self._addThread(target=fn),
+ self._addThread(target=fn),
+ ]
# Wait for threads to be waiting
- self.assertEqual(self.done.get(True, 1), "A")
- self.assertEqual(self.done.get(True, 1), "A")
- self.assertEqual(self.done.get(True, 1), "A")
+ for _ in threads:
+ self.assertEqual(self.done.get(True, 1), "A")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.acquire()
- self.assertEqual(self.cond._nwaiters, 3)
- # This new thread can"t acquire the lock, and thus call wait, before we
+ self.assertEqual(len(self.cond._waiters), 3)
+ self.assertEqual(self.cond._waiters, set(threads))
+
+ self.assertTrue(repr(self.cond).startswith("<"))
+ self.assertTrue("waiters=" in repr(self.cond))
+
+ # This new thread can't acquire the lock, and thus call wait, before we
# release it
self._addThread(target=fn)
self.cond.notifyAll()
def _BlockingWait():
self.cond.acquire()
self.done.put("A")
- self.cond.wait()
+ self.cond.wait(None)
self.cond.release()
self.done.put("W")
self.assertRaises(Queue.Empty, self.done.get_nowait)
-class TestSingleActionPipeCondition(unittest.TestCase):
- """_SingleActionPipeCondition tests"""
-
- def setUp(self):
- self.cond = locking._SingleActionPipeCondition()
-
- def testInitialization(self):
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
- self.assertEqual(self.cond._nwaiters, 0)
-
- def testUsageCount(self):
- self.cond.StartWaiting()
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
- self.assertEqual(self.cond._nwaiters, 1)
-
- # use again
- self.cond.StartWaiting()
- self.assertEqual(self.cond._nwaiters, 2)
-
- # there is more than one user
- self.assert_(not self.cond.DoneWaiting())
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
- self.assertEqual(self.cond._nwaiters, 1)
-
- self.assert_(self.cond.DoneWaiting())
- self.assertEqual(self.cond._nwaiters, 0)
- self.assert_(self.cond._read_fd is None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is None)
-
- def testNotify(self):
- wait1 = self.cond.StartWaiting()
- wait2 = self.cond.StartWaiting()
-
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
-
- self.cond.notifyAll()
-
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is not None)
-
- self.assert_(not self.cond.DoneWaiting())
-
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is not None)
-
- self.assert_(self.cond.DoneWaiting())
-
- self.assert_(self.cond._read_fd is None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is None)
-
- def testReusage(self):
- self.cond.StartWaiting()
- self.assert_(self.cond._read_fd is not None)
- self.assert_(self.cond._write_fd is not None)
- self.assert_(self.cond._poller is not None)
-
- self.assert_(self.cond.DoneWaiting())
-
- self.assertRaises(RuntimeError, self.cond.StartWaiting)
- self.assert_(self.cond._read_fd is None)
- self.assert_(self.cond._write_fd is None)
- self.assert_(self.cond._poller is None)
-
- def testNotifyTwice(self):
- self.cond.notifyAll()
- self.assertRaises(RuntimeError, self.cond.notifyAll)
-
-
class TestSharedLock(_ThreadedTestCase):
"""SharedLock tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
- self.sl = locking.SharedLock()
- # helper threads use the 'done' queue to tell the master they finished.
- self.done = Queue.Queue(0)
+ self.sl = locking.SharedLock("TestSharedLock")
+
+ self.assertTrue(repr(self.sl).startswith("<"))
+ self.assertTrue("name=TestSharedLock" in repr(self.sl))
def testSequenceAndOwnership(self):
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl.is_owned())
self.sl.acquire(shared=1)
- self.assert_(self.sl._is_owned())
- self.assert_(self.sl._is_owned(shared=1))
- self.assert_(not self.sl._is_owned(shared=0))
+ self.assert_(self.sl.is_owned())
+ self.assert_(self.sl.is_owned(shared=1))
+ self.assertFalse(self.sl.is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl.is_owned())
self.sl.acquire()
- self.assert_(self.sl._is_owned())
- self.assert_(not self.sl._is_owned(shared=1))
- self.assert_(self.sl._is_owned(shared=0))
+ self.assert_(self.sl.is_owned())
+ self.assertFalse(self.sl.is_owned(shared=1))
+ self.assert_(self.sl.is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl.is_owned())
self.sl.acquire(shared=1)
- self.assert_(self.sl._is_owned())
- self.assert_(self.sl._is_owned(shared=1))
- self.assert_(not self.sl._is_owned(shared=0))
+ self.assert_(self.sl.is_owned())
+ self.assert_(self.sl.is_owned(shared=1))
+ self.assertFalse(self.sl.is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl.is_owned())
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
def _doItSharer(self):
try:
self.sl.acquire(shared=1)
- self.done.put('SHR')
+ self.done.put("SHR")
self.sl.release()
except errors.LockError:
- self.done.put('ERR')
+ self.done.put("ERR")
def _doItExclusive(self):
try:
self.sl.acquire()
- self.done.put('EXC')
+ self.done.put("EXC")
self.sl.release()
except errors.LockError:
- self.done.put('ERR')
+ self.done.put("ERR")
def _doItDelete(self):
try:
self.sl.delete()
- self.done.put('DEL')
+ self.done.put("DEL")
except errors.LockError:
- self.done.put('ERR')
+ self.done.put("ERR")
def testSharersCanCoexist(self):
self.sl.acquire(shared=1)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
- self.failUnlessEqual(self.done.get_nowait(), 'EXC')
+ self.failUnlessEqual(self.done.get_nowait(), "EXC")
@_Repeat
def testExclusiveBlocksDelete(self):
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
- self.failUnlessEqual(self.done.get_nowait(), 'DEL')
- self.sl = locking.SharedLock()
+ self.failUnlessEqual(self.done.get_nowait(), "DEL")
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testExclusiveBlocksSharer(self):
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
- self.failUnlessEqual(self.done.get_nowait(), 'SHR')
+ self.failUnlessEqual(self.done.get_nowait(), "SHR")
@_Repeat
def testSharerBlocksExclusive(self):
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
- self.failUnlessEqual(self.done.get_nowait(), 'EXC')
+ self.failUnlessEqual(self.done.get_nowait(), "EXC")
@_Repeat
def testSharerBlocksDelete(self):
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
- self.failUnlessEqual(self.done.get_nowait(), 'DEL')
- self.sl = locking.SharedLock()
+ self.failUnlessEqual(self.done.get_nowait(), "DEL")
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testWaitingExclusiveBlocksSharer(self):
self.sl.release()
self._waitThreads()
# The exclusive passed before
- self.failUnlessEqual(self.done.get_nowait(), 'EXC')
- self.failUnlessEqual(self.done.get_nowait(), 'SHR')
+ self.failUnlessEqual(self.done.get_nowait(), "EXC")
+ self.failUnlessEqual(self.done.get_nowait(), "SHR")
@_Repeat
def testWaitingSharerBlocksExclusive(self):
self.sl.release()
self._waitThreads()
# The sharer passed before
- self.assertEqual(self.done.get_nowait(), 'SHR')
- self.assertEqual(self.done.get_nowait(), 'EXC')
+ self.assertEqual(self.done.get_nowait(), "SHR")
+ self.assertEqual(self.done.get_nowait(), "EXC")
def testDelete(self):
self.sl.delete()
self.assertRaises(errors.LockError, self.sl.delete)
def testDeleteTimeout(self):
- self.sl.delete(timeout=60)
+ self.assertTrue(self.sl.delete(timeout=60))
+
+ def testDeleteTimeoutFail(self):
+ ready = threading.Event()
+ finish = threading.Event()
+
+ def fn():
+ self.sl.acquire(shared=0)
+ ready.set()
+
+ finish.wait()
+ self.sl.release()
+
+ self._addThread(target=fn)
+ ready.wait()
+
+ # Test if deleting a lock owned in exclusive mode by another thread fails
+ # to delete when a timeout is used
+ self.assertFalse(self.sl.delete(timeout=0.02))
+
+ finish.set()
+ self._waitThreads()
+
+ self.assertTrue(self.sl.delete())
+ self.assertRaises(errors.LockError, self.sl.acquire)
def testNoDeleteIfSharer(self):
self.sl.acquire(shared=1)
self._waitThreads()
# The threads who were pending return ERR
for _ in range(4):
- self.assertEqual(self.done.get_nowait(), 'ERR')
- self.sl = locking.SharedLock()
+ self.assertEqual(self.done.get_nowait(), "ERR")
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testDeletePendingDeleteExclusiveSharers(self):
self.sl.delete()
self._waitThreads()
# The two threads who were pending return both ERR
- self.assertEqual(self.done.get_nowait(), 'ERR')
- self.assertEqual(self.done.get_nowait(), 'ERR')
- self.assertEqual(self.done.get_nowait(), 'ERR')
- self.assertEqual(self.done.get_nowait(), 'ERR')
- self.sl = locking.SharedLock()
+ self.assertEqual(self.done.get_nowait(), "ERR")
+ self.assertEqual(self.done.get_nowait(), "ERR")
+ self.assertEqual(self.done.get_nowait(), "ERR")
+ self.assertEqual(self.done.get_nowait(), "ERR")
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testExclusiveAcquireTimeout(self):
- def _LockExclusive(wait):
- self.sl.acquire(shared=0)
- self.done.put("A: start sleep")
- time.sleep(wait)
- self.done.put("A: end sleep")
- self.sl.release()
-
for shared in [0, 1]:
- # Start thread to hold lock for 20 ms
- self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
+ on_queue = threading.Event()
+ release_exclusive = threading.Event()
+
+ def _LockExclusive():
+ self.sl.acquire(shared=0, test_notify=on_queue.set)
+ self.done.put("A: start wait")
+ release_exclusive.wait()
+ self.done.put("A: end wait")
+ self.sl.release()
- # Wait for sleep to begin
- self.assertEqual(self.done.get(), "A: start sleep")
+ # Start thread to hold lock in exclusive mode
+ self._addThread(target=_LockExclusive)
+
+ # Wait for wait to begin
+ self.assertEqual(self.done.get(timeout=60), "A: start wait")
+
+ # Wait up to 60s to get lock, but release exclusive lock as soon as we're
+ # on the queue
+ self.failUnless(self.sl.acquire(shared=shared, timeout=60,
+ test_notify=release_exclusive.set))
- # Wait up to 100 ms to get lock
- self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
self.done.put("got 2nd")
self.sl.release()
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), "A: end sleep")
+ self.assertEqual(self.done.get_nowait(), "A: end wait")
self.assertEqual(self.done.get_nowait(), "got 2nd")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.acquire()
# Start shared acquires with timeout between 0 and 20 ms
- for i in xrange(11):
+ for i in range(11):
self._addThread(target=_AcquireWithTimeout,
args=(shared, i * 2.0 / 1000.0))
# Release lock
self.sl.release()
- for _ in xrange(11):
+ for _ in range(11):
self.assertEqual(self.done.get_nowait(), "timeout")
self.assertRaises(Queue.Empty, self.done.get_nowait)
# Tests whether shared acquires jump in front of exclusive acquires in the
# queue.
- # Get exclusive lock while we fill the queue
- self.sl.acquire()
+ def _Acquire(shared, name, notify_ev, wait_ev):
+ if notify_ev:
+ notify_fn = notify_ev.set
+ else:
+ notify_fn = None
- def _Acquire(shared, name):
- if not self.sl.acquire(shared=shared):
+ if wait_ev:
+ wait_ev.wait()
+
+ if not self.sl.acquire(shared=shared, test_notify=notify_fn):
return
self.done.put(name)
self.sl.release()
- # Start shared acquires
- for _ in xrange(5):
- self._addThread(target=_Acquire, args=(1, "shared A"))
+ # Get exclusive lock while we fill the queue
+ self.sl.acquire()
+
+ shrcnt1 = 5
+ shrcnt2 = 7
+ shrcnt3 = 9
+ shrcnt4 = 2
+
+ # Add acquires using threading.Event for synchronization. They'll be
+ # acquired exactly in the order defined in this list.
+ acquires = (shrcnt1 * [(1, "shared 1")] +
+ 3 * [(0, "exclusive 1")] +
+ shrcnt2 * [(1, "shared 2")] +
+ shrcnt3 * [(1, "shared 3")] +
+ shrcnt4 * [(1, "shared 4")] +
+ 3 * [(0, "exclusive 2")])
- # Start exclusive acquires
- for _ in xrange(3):
- self._addThread(target=_Acquire, args=(0, "exclusive B"))
+ ev_cur = None
+ ev_prev = None
- # More shared acquires
- for _ in xrange(5):
- self._addThread(target=_Acquire, args=(1, "shared C"))
+ for args in acquires:
+ ev_cur = threading.Event()
+ self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
+ ev_prev = ev_cur
- # More exclusive acquires
- for _ in xrange(3):
- self._addThread(target=_Acquire, args=(0, "exclusive D"))
+ # Wait for last acquire to start
+ ev_prev.wait()
# Expect 6 pending exclusive acquires and 1 for all shared acquires
- # together. There's no way to wait for SharedLock.acquire to start
- # its work. Hence the timeout of 2 seconds.
- pending = 0
- end_time = time.time() + 2.0
- while time.time() < end_time:
- pending = self.sl._count_pending()
- self.assert_(pending >= 0 and pending <= 7)
- if pending == 7:
- break
- time.sleep(0.05)
- self.assertEqual(pending, 7)
+ # together
+ self.assertEqual(self.sl._count_pending(), 7)
# Release exclusive lock and wait
self.sl.release()
self._waitThreads()
# Check sequence
- shr_a = 0
- shr_c = 0
- for _ in xrange(10):
+ for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
# Shared locks aren't guaranteed to be notified in order, but they'll be
# first
tmp = self.done.get_nowait()
- if tmp == "shared A":
- shr_a += 1
- elif tmp == "shared C":
- shr_c += 1
- self.assertEqual(shr_a, 5)
- self.assertEqual(shr_c, 5)
+ if tmp == "shared 1":
+ shrcnt1 -= 1
+ elif tmp == "shared 2":
+ shrcnt2 -= 1
+ elif tmp == "shared 3":
+ shrcnt3 -= 1
+ elif tmp == "shared 4":
+ shrcnt4 -= 1
+ self.assertEqual(shrcnt1, 0)
+ self.assertEqual(shrcnt2, 0)
+ self.assertEqual(shrcnt3, 0)
+ self.assertEqual(shrcnt3, 0)
- for _ in xrange(3):
- self.assertEqual(self.done.get_nowait(), "exclusive B")
+ for _ in range(3):
+ self.assertEqual(self.done.get_nowait(), "exclusive 1")
- for _ in xrange(3):
- self.assertEqual(self.done.get_nowait(), "exclusive D")
+ for _ in range(3):
+ self.assertEqual(self.done.get_nowait(), "exclusive 2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
+ def testIllegalDowngrade(self):
+ # Not yet acquired
+ self.assertRaises(AssertionError, self.sl.downgrade)
+
+ # Acquire in shared mode, downgrade should be no-op
+ self.assertTrue(self.sl.acquire(shared=1))
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.assertTrue(self.sl.downgrade())
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.sl.release()
+
+ def testDowngrade(self):
+ self.assertTrue(self.sl.acquire())
+ self.assertTrue(self.sl.is_owned(shared=0))
+ self.assertTrue(self.sl.downgrade())
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.sl.release()
+
+ @_Repeat
+ def testDowngradeJumpsAheadOfExclusive(self):
+ def _KeepExclusive(ev_got, ev_downgrade, ev_release):
+ self.assertTrue(self.sl.acquire())
+ self.assertTrue(self.sl.is_owned(shared=0))
+ ev_got.set()
+ ev_downgrade.wait()
+ self.assertTrue(self.sl.is_owned(shared=0))
+ self.assertTrue(self.sl.downgrade())
+ self.assertTrue(self.sl.is_owned(shared=1))
+ ev_release.wait()
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.sl.release()
+
+ def _KeepExclusive2(ev_started, ev_release):
+ self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
+ self.assertTrue(self.sl.is_owned(shared=0))
+ ev_release.wait()
+ self.assertTrue(self.sl.is_owned(shared=0))
+ self.sl.release()
+
+ def _KeepShared(ev_started, ev_got, ev_release):
+ self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
+ self.assertTrue(self.sl.is_owned(shared=1))
+ ev_got.set()
+ ev_release.wait()
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.sl.release()
+
+ # Acquire lock in exclusive mode
+ ev_got_excl1 = threading.Event()
+ ev_downgrade_excl1 = threading.Event()
+ ev_release_excl1 = threading.Event()
+ th_excl1 = self._addThread(target=_KeepExclusive,
+ args=(ev_got_excl1, ev_downgrade_excl1,
+ ev_release_excl1))
+ ev_got_excl1.wait()
+
+ # Start a second exclusive acquire
+ ev_started_excl2 = threading.Event()
+ ev_release_excl2 = threading.Event()
+ th_excl2 = self._addThread(target=_KeepExclusive2,
+ args=(ev_started_excl2, ev_release_excl2))
+ ev_started_excl2.wait()
+
+ # Start shared acquires, will jump ahead of second exclusive acquire when
+ # first exclusive acquire downgrades
+ ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
+ ev_release_shared = threading.Event()
+
+ th_shared = [self._addThread(target=_KeepShared,
+ args=(ev_started, ev_got, ev_release_shared))
+ for (ev_started, ev_got) in ev_shared]
+
+ # Wait for all shared acquires to start
+ for (ev, _) in ev_shared:
+ ev.wait()
+
+ # Check lock information
+ self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+ [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
+ [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
+ self.assertEqual([(pendmode, sorted(waiting))
+ for (pendmode, waiting) in pending],
+ [("exclusive", [th_excl2.getName()]),
+ ("shared", sorted(th.getName() for th in th_shared))])
+
+ # Shared acquires won't start until the exclusive lock is downgraded
+ ev_downgrade_excl1.set()
+
+ # Wait for all shared acquires to be successful
+ for (_, ev) in ev_shared:
+ ev.wait()
+
+ # Check lock information again
+ self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
+ query.LQ_PENDING])),
+ [(self.sl.name, "shared", None,
+ [("exclusive", [th_excl2.getName()])])])
+ [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
+ self.assertEqual(set(owner), set([th_excl1.getName()] +
+ [th.getName() for th in th_shared]))
+
+ ev_release_excl1.set()
+ ev_release_excl2.set()
+ ev_release_shared.set()
+
+ self._waitThreads()
+
+ self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
+ query.LQ_PENDING])),
+ [(self.sl.name, None, None, [])])
+
@_Repeat
def testMixedAcquireTimeout(self):
- sync = threading.Condition()
+ sync = threading.Event()
def _AcquireShared(ev):
if not self.sl.acquire(shared=1, timeout=None):
# Notify main thread
ev.set()
- # Wait for notification
- sync.acquire()
- try:
- sync.wait()
- finally:
- sync.release()
+ # Wait for notification from main thread
+ sync.wait()
# Release lock
self.sl.release()
acquires = []
- for _ in xrange(3):
+ for _ in range(3):
ev = threading.Event()
self._addThread(target=_AcquireShared, args=(ev, ))
acquires.append(ev)
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Acquire exclusive without timeout
- exclsync = threading.Condition()
+ exclsync = threading.Event()
exclev = threading.Event()
def _AcquireExclusive():
# Notify main thread
exclev.set()
- exclsync.acquire()
- try:
- exclsync.wait()
- finally:
- exclsync.release()
+ # Wait for notification from main thread
+ exclsync.wait()
self.sl.release()
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Make all shared holders release their locks
- sync.acquire()
- try:
- sync.notifyAll()
- finally:
- sync.release()
+ sync.set()
# Wait for exclusive acquire to succeed
exclev.wait()
self.done.put("shared2")
self.sl.release()
- for _ in xrange(10):
+ for _ in range(10):
self._addThread(target=_AcquireSharedSimple)
# Tell exclusive lock to release
- exclsync.acquire()
- try:
- exclsync.notifyAll()
- finally:
- exclsync.release()
+ exclsync.set()
# Wait for everything to finish
self._waitThreads()
self.assertEqual(self.sl._count_pending(), 0)
# Check sequence
- for _ in xrange(3):
+ for _ in range(3):
self.assertEqual(self.done.get_nowait(), "shared")
self.assertEqual(self.done.get_nowait(), "exclusive")
- for _ in xrange(10):
+ for _ in range(10):
self.assertEqual(self.done.get_nowait(), "shared2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
+ def testPriority(self):
+ # Acquire in exclusive mode
+ self.assert_(self.sl.acquire(shared=0))
+
+ # Queue acquires
+ def _Acquire(prev, next, shared, priority, result):
+ prev.wait()
+ self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
+ try:
+ self.done.put(result)
+ finally:
+ self.sl.release()
+
+ counter = itertools.count(0)
+ priorities = range(-20, 30)
+ first = threading.Event()
+ prev = first
+
+ # Data structure:
+ # {
+ # priority:
+ # [(shared/exclusive, set(acquire names), set(pending threads)),
+ # (shared/exclusive, ...),
+ # ...,
+ # ],
+ # }
+ perprio = {}
+
+ # References shared acquire per priority in L{perprio}. Data structure:
+ # {
+ # priority: (shared=1, set(acquire names), set(pending threads)),
+ # }
+ prioshared = {}
+
+ for seed in [4979, 9523, 14902, 32440]:
+ # Use a deterministic random generator
+ rnd = random.Random(seed)
+ for priority in [rnd.choice(priorities) for _ in range(30)]:
+ modes = [0, 1]
+ rnd.shuffle(modes)
+ for shared in modes:
+ # Unique name
+ acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
+
+ ev = threading.Event()
+ thread = self._addThread(target=_Acquire,
+ args=(prev, ev, shared, priority, acqname))
+ prev = ev
+
+ # Record expected aqcuire, see above for structure
+ data = (shared, set([acqname]), set([thread]))
+ priolist = perprio.setdefault(priority, [])
+ if shared:
+ priosh = prioshared.get(priority, None)
+ if priosh:
+ # Shared acquires are merged
+ for i, j in zip(priosh[1:], data[1:]):
+ i.update(j)
+ assert data[0] == priosh[0]
+ else:
+ prioshared[priority] = data
+ priolist.append(data)
+ else:
+ priolist.append(data)
+
+ # Start all acquires and wait for them
+ first.set()
+ prev.wait()
+
+ # Check lock information
+ self.assertEqual(self.sl.GetLockInfo(set()),
+ [(self.sl.name, None, None, None)])
+ self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+ [(self.sl.name, "exclusive",
+ [threading.currentThread().getName()], None)])
+
+ self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
+ perprio)
+
+ # Let threads acquire the lock
+ self.sl.release()
+
+ # Wait for everything to finish
+ self._waitThreads()
+
+ self.assert_(self.sl._check_empty())
+
+ # Check acquires by priority
+ for acquires in [perprio[i] for i in sorted(perprio.keys())]:
+ for (_, names, _) in acquires:
+ # For shared acquires, the set will contain 1..n entries. For exclusive
+ # acquires only one.
+ while names:
+ names.remove(self.done.get_nowait())
+ self.assertFalse(compat.any(names for (_, names, _) in acquires))
+
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
+ self.assertEqual(name, self.sl.name)
+ self.assert_(mode is None)
+ self.assert_(owner is None)
+
+ self.assertEqual([(pendmode, sorted(waiting))
+ for (pendmode, waiting) in pending],
+ [(["exclusive", "shared"][int(bool(shared))],
+ sorted(t.getName() for t in threads))
+ for acquires in [perprio[i]
+ for i in sorted(perprio.keys())]
+ for (shared, _, threads) in acquires])
+
+ class _FakeTimeForSpuriousNotifications:
+ def __init__(self, now, check_end):
+ self.now = now
+ self.check_end = check_end
+
+ # Deterministic random number generator
+ self.rnd = random.Random(15086)
+
+ def time(self):
+ # Advance time if the random number generator thinks so (this is to test
+ # multiple notifications without advancing the time)
+ if self.rnd.random() < 0.3:
+ self.now += self.rnd.random()
+
+ self.check_end(self.now)
+
+ return self.now
+
+ @_Repeat
+ def testAcquireTimeoutWithSpuriousNotifications(self):
+ ready = threading.Event()
+ locked = threading.Event()
+ req = Queue.Queue(0)
+
+ epoch = 4000.0
+ timeout = 60.0
+
+ def check_end(now):
+ self.assertFalse(locked.isSet())
+
+ # If we waited long enough (in virtual time), tell main thread to release
+ # lock, otherwise tell it to notify once more
+ req.put(now < (epoch + (timeout * 0.8)))
+
+ time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
+
+ sl = locking.SharedLock("test", _time_fn=time_fn)
+
+ # Acquire in exclusive mode
+ sl.acquire(shared=0)
+
+ def fn():
+ self.assertTrue(sl.acquire(shared=0, timeout=timeout,
+ test_notify=ready.set))
+ locked.set()
+ sl.release()
+ self.done.put("success")
+
+ # Start acquire with timeout and wait for it to be ready
+ self._addThread(target=fn)
+ ready.wait()
+
+ # The separate thread is now waiting to acquire the lock, so start sending
+ # spurious notifications.
+
+ # Wait for separate thread to ask for another notification
+ count = 0
+ while req.get():
+ # After sending the notification, the lock will take a short amount of
+ # time to notice and to retrieve the current time
+ sl._notify_topmost()
+ count += 1
+
+ self.assertTrue(count > 100, "Not enough notifications were sent")
+
+ self.assertFalse(locked.isSet())
+
+ # Some notifications have been sent, now actually release the lock
+ sl.release()
+
+ # Wait for lock to be acquired
+ locked.wait()
+
+ self._waitThreads()
+
+ self.assertEqual(self.done.get_nowait(), "success")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+
+class TestSharedLockInCondition(_ThreadedTestCase):
+ """SharedLock as a condition lock tests"""
+
+ def setUp(self):
+ _ThreadedTestCase.setUp(self)
+ self.sl = locking.SharedLock("TestSharedLockInCondition")
+ self.setCondition()
+
+ def setCondition(self):
+ self.cond = threading.Condition(self.sl)
+
+ def testKeepMode(self):
+ self.cond.acquire(shared=1)
+ self.assert_(self.sl.is_owned(shared=1))
+ self.cond.wait(0)
+ self.assert_(self.sl.is_owned(shared=1))
+ self.cond.release()
+ self.cond.acquire(shared=0)
+ self.assert_(self.sl.is_owned(shared=0))
+ self.cond.wait(0)
+ self.assert_(self.sl.is_owned(shared=0))
+ self.cond.release()
+
+
+class TestSharedLockInPipeCondition(TestSharedLockInCondition):
+ """SharedLock as a pipe condition lock tests"""
+
+ def setCondition(self):
+ self.cond = locking.PipeCondition(self.sl)
+
class TestSSynchronizedDecorator(_ThreadedTestCase):
"""Shared Lock Synchronized decorator test"""
def setUp(self):
_ThreadedTestCase.setUp(self)
- # helper threads use the 'done' queue to tell the master they finished.
- self.done = Queue.Queue(0)
@locking.ssynchronized(_decoratorlock)
def _doItExclusive(self):
- self.assert_(_decoratorlock._is_owned())
- self.done.put('EXC')
+ self.assert_(_decoratorlock.is_owned())
+ self.done.put("EXC")
@locking.ssynchronized(_decoratorlock, shared=1)
def _doItSharer(self):
- self.assert_(_decoratorlock._is_owned(shared=1))
- self.done.put('SHR')
+ self.assert_(_decoratorlock.is_owned(shared=1))
+ self.done.put("SHR")
def testDecoratedFunctions(self):
self._doItExclusive()
- self.assert_(not _decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock.is_owned())
self._doItSharer()
- self.assert_(not _decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock.is_owned())
def testSharersCanCoexist(self):
_decoratorlock.acquire(shared=1)
self.assertRaises(Queue.Empty, self.done.get_nowait)
_decoratorlock.release()
self._waitThreads()
- self.failUnlessEqual(self.done.get_nowait(), 'EXC')
+ self.failUnlessEqual(self.done.get_nowait(), "EXC")
@_Repeat
def testExclusiveBlocksSharer(self):
self.assertRaises(Queue.Empty, self.done.get_nowait)
_decoratorlock.release()
self._waitThreads()
- self.failUnlessEqual(self.done.get_nowait(), 'SHR')
+ self.failUnlessEqual(self.done.get_nowait(), "SHR")
@_Repeat
def testSharerBlocksExclusive(self):
self.assertRaises(Queue.Empty, self.done.get_nowait)
_decoratorlock.release()
self._waitThreads()
- self.failUnlessEqual(self.done.get_nowait(), 'EXC')
+ self.failUnlessEqual(self.done.get_nowait(), "EXC")
class TestLockSet(_ThreadedTestCase):
def setUp(self):
_ThreadedTestCase.setUp(self)
self._setUpLS()
- # helper threads use the 'done' queue to tell the master they finished.
- self.done = Queue.Queue(0)
def _setUpLS(self):
"""Helper to (re)initialize the lock set"""
- self.resources = ['one', 'two', 'three']
- self.ls = locking.LockSet(members=self.resources)
+ self.resources = ["one", "two", "three"]
+ self.ls = locking.LockSet(self.resources, "TestLockSet")
def testResources(self):
self.assertEquals(self.ls._names(), set(self.resources))
- newls = locking.LockSet()
+ newls = locking.LockSet([], "TestLockSet.testResources")
self.assertEquals(newls._names(), set())
+ def testCheckOwnedUnknown(self):
+ self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
+ for shared in [-1, 0, 1, 6378, 24255]:
+ self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
+ shared=shared))
+
+ def testCheckOwnedUnknownWhileHolding(self):
+ self.assertFalse(self.ls.check_owned([]))
+ self.ls.acquire("one", shared=1)
+ self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
+ self.assertTrue(self.ls.check_owned("one", shared=1))
+ self.assertFalse(self.ls.check_owned("one", shared=0))
+ self.assertFalse(self.ls.check_owned(["one", "two"]))
+ self.assertRaises(errors.LockError, self.ls.check_owned,
+ ["one", "nonexist"])
+ self.assertRaises(errors.LockError, self.ls.check_owned, "")
+ self.ls.release()
+ self.assertFalse(self.ls.check_owned([]))
+ self.assertFalse(self.ls.check_owned("one"))
+
def testAcquireRelease(self):
- self.assert_(self.ls.acquire('one'))
- self.assertEquals(self.ls._list_owned(), set(['one']))
+ self.assertFalse(self.ls.check_owned(self.ls._names()))
+ self.assert_(self.ls.acquire("one"))
+ self.assertEquals(self.ls.list_owned(), set(["one"]))
+ self.assertTrue(self.ls.check_owned("one"))
+ self.assertTrue(self.ls.check_owned("one", shared=0))
+ self.assertFalse(self.ls.check_owned("one", shared=1))
self.ls.release()
- self.assertEquals(self.ls._list_owned(), set())
- self.assertEquals(self.ls.acquire(['one']), set(['one']))
- self.assertEquals(self.ls._list_owned(), set(['one']))
+ self.assertEquals(self.ls.list_owned(), set())
+ self.assertFalse(self.ls.check_owned(self.ls._names()))
+ self.assertEquals(self.ls.acquire(["one"]), set(["one"]))
+ self.assertEquals(self.ls.list_owned(), set(["one"]))
self.ls.release()
- self.assertEquals(self.ls._list_owned(), set())
- self.ls.acquire(['one', 'two', 'three'])
- self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
- self.ls.release('one')
- self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
- self.ls.release(['three'])
- self.assertEquals(self.ls._list_owned(), set(['two']))
+ self.assertEquals(self.ls.list_owned(), set())
+ self.ls.acquire(["one", "two", "three"])
+ self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
+ self.assertTrue(self.ls.check_owned(self.ls._names()))
+ self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+ self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
+ self.ls.release("one")
+ self.assertFalse(self.ls.check_owned(["one"]))
+ self.assertTrue(self.ls.check_owned(["two", "three"]))
+ self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
+ self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
+ self.assertEquals(self.ls.list_owned(), set(["two", "three"]))
+ self.ls.release(["three"])
+ self.assertEquals(self.ls.list_owned(), set(["two"]))
self.ls.release()
- self.assertEquals(self.ls._list_owned(), set())
- self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
- self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
+ self.assertEquals(self.ls.list_owned(), set())
+ self.assertEquals(self.ls.acquire(["one", "three"]), set(["one", "three"]))
+ self.assertEquals(self.ls.list_owned(), set(["one", "three"]))
self.ls.release()
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
+ for name in self.ls._names():
+ self.assertFalse(self.ls.check_owned(name))
def testNoDoubleAcquire(self):
- self.ls.acquire('one')
- self.assertRaises(AssertionError, self.ls.acquire, 'one')
- self.assertRaises(AssertionError, self.ls.acquire, ['two'])
- self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
+ self.ls.acquire("one")
+ self.assertRaises(AssertionError, self.ls.acquire, "one")
+ self.assertRaises(AssertionError, self.ls.acquire, ["two"])
+ self.assertRaises(AssertionError, self.ls.acquire, ["two", "three"])
self.ls.release()
- self.ls.acquire(['one', 'three'])
- self.ls.release('one')
- self.assertRaises(AssertionError, self.ls.acquire, ['two'])
- self.ls.release('three')
+ self.ls.acquire(["one", "three"])
+ self.ls.release("one")
+ self.assertRaises(AssertionError, self.ls.acquire, ["two"])
+ self.ls.release("three")
def testNoWrongRelease(self):
self.assertRaises(AssertionError, self.ls.release)
- self.ls.acquire('one')
- self.assertRaises(AssertionError, self.ls.release, 'two')
+ self.ls.acquire("one")
+ self.assertRaises(AssertionError, self.ls.release, "two")
def testAddRemove(self):
- self.ls.add('four')
- self.assertEquals(self.ls._list_owned(), set())
- self.assert_('four' in self.ls._names())
- self.ls.add(['five', 'six', 'seven'], acquired=1)
- self.assert_('five' in self.ls._names())
- self.assert_('six' in self.ls._names())
- self.assert_('seven' in self.ls._names())
- self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
- self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
- self.assert_('five' not in self.ls._names())
- self.assert_('six' not in self.ls._names())
- self.assertEquals(self.ls._list_owned(), set(['seven']))
- self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
- self.ls.remove('seven')
- self.assert_('seven' not in self.ls._names())
- self.assertEquals(self.ls._list_owned(), set([]))
+ self.ls.add("four")
+ self.assertEquals(self.ls.list_owned(), set())
+ self.assert_("four" in self.ls._names())
+ self.ls.add(["five", "six", "seven"], acquired=1)
+ self.assert_("five" in self.ls._names())
+ self.assert_("six" in self.ls._names())
+ self.assert_("seven" in self.ls._names())
+ self.assertEquals(self.ls.list_owned(), set(["five", "six", "seven"]))
+ self.assertEquals(self.ls.remove(["five", "six"]), ["five", "six"])
+ self.assert_("five" not in self.ls._names())
+ self.assert_("six" not in self.ls._names())
+ self.assertEquals(self.ls.list_owned(), set(["seven"]))
+ self.assertRaises(AssertionError, self.ls.add, "eight", acquired=1)
+ self.ls.remove("seven")
+ self.assert_("seven" not in self.ls._names())
+ self.assertEquals(self.ls.list_owned(), set([]))
self.ls.acquire(None, shared=1)
- self.assertRaises(AssertionError, self.ls.add, 'eight')
+ self.assertRaises(AssertionError, self.ls.add, "eight")
self.ls.release()
self.ls.acquire(None)
- self.ls.add('eight', acquired=1)
- self.assert_('eight' in self.ls._names())
- self.assert_('eight' in self.ls._list_owned())
- self.ls.add('nine')
- self.assert_('nine' in self.ls._names())
- self.assert_('nine' not in self.ls._list_owned())
+ self.ls.add("eight", acquired=1)
+ self.assert_("eight" in self.ls._names())
+ self.assert_("eight" in self.ls.list_owned())
+ self.ls.add("nine")
+ self.assert_("nine" in self.ls._names())
+ self.assert_("nine" not in self.ls.list_owned())
self.ls.release()
- self.ls.remove(['two'])
- self.assert_('two' not in self.ls._names())
- self.ls.acquire('three')
- self.assertEquals(self.ls.remove(['three']), ['three'])
- self.assert_('three' not in self.ls._names())
- self.assertEquals(self.ls.remove('three'), [])
- self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
- self.assert_('one' not in self.ls._names())
+ self.ls.remove(["two"])
+ self.assert_("two" not in self.ls._names())
+ self.ls.acquire("three")
+ self.assertEquals(self.ls.remove(["three"]), ["three"])
+ self.assert_("three" not in self.ls._names())
+ self.assertEquals(self.ls.remove("three"), [])
+ self.assertEquals(self.ls.remove(["one", "three", "six"]), ["one"])
+ self.assert_("one" not in self.ls._names())
def testRemoveNonBlocking(self):
- self.ls.acquire('one')
- self.assertEquals(self.ls.remove('one'), ['one'])
- self.ls.acquire(['two', 'three'])
- self.assertEquals(self.ls.remove(['two', 'three']),
- ['two', 'three'])
+ self.ls.acquire("one")
+ self.assertEquals(self.ls.remove("one"), ["one"])
+ self.ls.acquire(["two", "three"])
+ self.assertEquals(self.ls.remove(["two", "three"]),
+ ["two", "three"])
def testNoDoubleAdd(self):
- self.assertRaises(errors.LockError, self.ls.add, 'two')
- self.ls.add('four')
- self.assertRaises(errors.LockError, self.ls.add, 'four')
+ self.assertRaises(errors.LockError, self.ls.add, "two")
+ self.ls.add("four")
+ self.assertRaises(errors.LockError, self.ls.add, "four")
def testNoWrongRemoves(self):
- self.ls.acquire(['one', 'three'], shared=1)
- # Cannot remove 'two' while holding something which is not a superset
- self.assertRaises(AssertionError, self.ls.remove, 'two')
- # Cannot remove 'three' as we are sharing it
- self.assertRaises(AssertionError, self.ls.remove, 'three')
+ self.ls.acquire(["one", "three"], shared=1)
+ # Cannot remove "two" while holding something which is not a superset
+ self.assertRaises(AssertionError, self.ls.remove, "two")
+ # Cannot remove "three" as we are sharing it
+ self.assertRaises(AssertionError, self.ls.remove, "three")
def testAcquireSetLock(self):
# acquire the set-lock exclusively
- self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
- self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
- self.assertEquals(self.ls._is_owned(), True)
- self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
+ self.assertEquals(self.ls.acquire(None), set(["one", "two", "three"]))
+ self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
+ self.assertEquals(self.ls.is_owned(), True)
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# I can still add/remove elements...
- self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
- self.assert_(self.ls.add('six'))
+ self.assertEquals(self.ls.remove(["two", "three"]), ["two", "three"])
+ self.assert_(self.ls.add("six"))
self.ls.release()
# share the set-lock
- self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
+ self.assertEquals(self.ls.acquire(None, shared=1), set(["one", "six"]))
# adding new elements is not possible
- self.assertRaises(AssertionError, self.ls.add, 'five')
+ self.assertRaises(AssertionError, self.ls.add, "five")
self.ls.release()
def testAcquireWithRepetitions(self):
- self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
- set(['two', 'two', 'three']))
- self.ls.release(['two', 'two'])
- self.assertEquals(self.ls._list_owned(), set(['three']))
+ self.assertEquals(self.ls.acquire(["two", "two", "three"], shared=1),
+ set(["two", "two", "three"]))
+ self.ls.release(["two", "two"])
+ self.assertEquals(self.ls.list_owned(), set(["three"]))
def testEmptyAcquire(self):
# Acquire an empty list of locks...
self.assertEquals(self.ls.acquire([]), set())
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
# New locks can still be addded
- self.assert_(self.ls.add('six'))
+ self.assert_(self.ls.add("six"))
# "re-acquiring" is not an issue, since we had really acquired nothing
self.assertEquals(self.ls.acquire([], shared=1), set())
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
# We haven't really acquired anything, so we cannot release
self.assertRaises(AssertionError, self.ls.release)
def _doLockSet(self, names, shared):
try:
self.ls.acquire(names, shared=shared)
- self.done.put('DONE')
+ self.done.put("DONE")
self.ls.release()
except errors.LockError:
- self.done.put('ERR')
+ self.done.put("ERR")
def _doAddSet(self, names):
try:
self.ls.add(names, acquired=1)
- self.done.put('DONE')
+ self.done.put("DONE")
self.ls.release()
except errors.LockError:
- self.done.put('ERR')
+ self.done.put("ERR")
def _doRemoveSet(self, names):
self.done.put(self.ls.remove(names))
@_Repeat
def testConcurrentSharedAcquire(self):
- self.ls.acquire(['one', 'two'], shared=1)
- self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
+ self.ls.acquire(["one", "two"], shared=1)
+ self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self._addThread(target=self._doLockSet, args=(["one", "two", "three"], 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self._addThread(target=self._doLockSet, args=('three', 1))
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self._addThread(target=self._doLockSet, args=("three", 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
- self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
+ self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self.assertEqual(self.done.get_nowait(), "DONE")
@_Repeat
def testConcurrentExclusiveAcquire(self):
- self.ls.acquire(['one', 'two'])
- self._addThread(target=self._doLockSet, args=('three', 1))
+ self.ls.acquire(["one", "two"])
+ self._addThread(target=self._doLockSet, args=("three", 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self._addThread(target=self._doLockSet, args=('three', 0))
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self._addThread(target=self._doLockSet, args=("three", 0))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
self.assertRaises(Queue.Empty, self.done.get_nowait)
- self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
- self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
- self._addThread(target=self._doLockSet, args=('one', 0))
- self._addThread(target=self._doLockSet, args=('one', 1))
- self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
- self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
+ self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
+ self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
+ self._addThread(target=self._doLockSet, args=("one", 0))
+ self._addThread(target=self._doLockSet, args=("one", 1))
+ self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
+ self._addThread(target=self._doLockSet, args=(["two", "three"], 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
for _ in range(6):
- self.failUnlessEqual(self.done.get_nowait(), 'DONE')
+ self.failUnlessEqual(self.done.get_nowait(), "DONE")
+
+ @_Repeat
+ def testSimpleAcquireTimeoutExpiring(self):
+ names = sorted(self.ls._names())
+ self.assert_(len(names) >= 3)
+
+ # Get name of first lock
+ first = names[0]
+
+ # Get name of last lock
+ last = names.pop()
+
+ checks = [
+ # Block first and try to lock it again
+ (first, first),
+
+ # Block last and try to lock all locks
+ (None, first),
+
+ # Block last and try to lock it again
+ (last, last),
+ ]
+
+ for (wanted, block) in checks:
+ # Lock in exclusive mode
+ self.assert_(self.ls.acquire(block, shared=0))
+
+ def _AcquireOne():
+ # Try to get the same lock again with a timeout (should never succeed)
+ acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
+ if acquired:
+ self.done.put("acquired")
+ self.ls.release()
+ else:
+ self.assert_(acquired is None)
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls.is_owned())
+ self.done.put("not acquired")
+
+ self._addThread(target=_AcquireOne)
+
+ # Wait for timeout in thread to expire
+ self._waitThreads()
+
+ # Release exclusive lock again
+ self.ls.release()
+
+ self.assertEqual(self.done.get_nowait(), "not acquired")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ @_Repeat
+ def testDelayedAndExpiringLockAcquire(self):
+ self._setUpLS()
+ self.ls.add(["five", "six", "seven", "eight", "nine"])
+
+ for expire in (False, True):
+ names = sorted(self.ls._names())
+ self.assertEqual(len(names), 8)
+
+ lock_ev = dict([(i, threading.Event()) for i in names])
+
+ # Lock all in exclusive mode
+ self.assert_(self.ls.acquire(names, shared=0))
+
+ if expire:
+ # We'll wait at least 300ms per lock
+ lockwait = len(names) * [0.3]
+
+ # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
+ # this gives us up to 2.4s to fail.
+ lockall_timeout = 0.4
+ else:
+ # This should finish rather quickly
+ lockwait = None
+ lockall_timeout = len(names) * 5.0
+
+ def _LockAll():
+ def acquire_notification(name):
+ if not expire:
+ self.done.put("getting %s" % name)
+
+ # Kick next lock
+ lock_ev[name].set()
+
+ if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
+ test_notify=acquire_notification):
+ self.done.put("got all")
+ self.ls.release()
+ else:
+ self.done.put("timeout on all")
+
+ # Notify all locks
+ for ev in lock_ev.values():
+ ev.set()
+
+ t = self._addThread(target=_LockAll)
+
+ for idx, name in enumerate(names):
+ # Wait for actual acquire on this lock to start
+ lock_ev[name].wait(10.0)
+
+ if expire and t.isAlive():
+ # Wait some time after getting the notification to make sure the lock
+ # acquire will expire
+ SafeSleep(lockwait[idx])
+
+ self.ls.release(names=name)
+
+ self.assertFalse(self.ls.list_owned())
+
+ self._waitThreads()
+
+ if expire:
+ # Not checking which locks were actually acquired. Doing so would be
+ # too timing-dependant.
+ self.assertEqual(self.done.get_nowait(), "timeout on all")
+ else:
+ for i in names:
+ self.assertEqual(self.done.get_nowait(), "getting %s" % i)
+ self.assertEqual(self.done.get_nowait(), "got all")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testConcurrentRemove(self):
- self.ls.add('four')
- self.ls.acquire(['one', 'two', 'four'])
- self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
- self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
- self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
- self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
+ self.ls.add("four")
+ self.ls.acquire(["one", "two", "four"])
+ self._addThread(target=self._doLockSet, args=(["one", "four"], 0))
+ self._addThread(target=self._doLockSet, args=(["one", "four"], 1))
+ self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
+ self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
- self.ls.remove('one')
+ self.ls.remove("one")
self.ls.release()
self._waitThreads()
for i in range(4):
- self.failUnlessEqual(self.done.get_nowait(), 'ERR')
- self.ls.add(['five', 'six'], acquired=1)
- self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
- self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
- self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
- self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
- self.ls.remove('five')
+ self.failUnlessEqual(self.done.get_nowait(), "ERR")
+ self.ls.add(["five", "six"], acquired=1)
+ self._addThread(target=self._doLockSet, args=(["three", "six"], 1))
+ self._addThread(target=self._doLockSet, args=(["three", "six"], 0))
+ self._addThread(target=self._doLockSet, args=(["four", "six"], 1))
+ self._addThread(target=self._doLockSet, args=(["four", "six"], 0))
+ self.ls.remove("five")
self.ls.release()
self._waitThreads()
for i in range(4):
- self.failUnlessEqual(self.done.get_nowait(), 'DONE')
- self.ls.acquire(['three', 'four'])
- self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
+ self.failUnlessEqual(self.done.get_nowait(), "DONE")
+ self.ls.acquire(["three", "four"])
+ self._addThread(target=self._doRemoveSet, args=(["four", "six"], ))
self.assertRaises(Queue.Empty, self.done.get_nowait)
- self.ls.remove('four')
+ self.ls.remove("four")
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), ['six'])
- self._addThread(target=self._doRemoveSet, args=(['two']))
+ self.assertEqual(self.done.get_nowait(), ["six"])
+ self._addThread(target=self._doRemoveSet, args=(["two"]))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), ['two'])
+ self.assertEqual(self.done.get_nowait(), ["two"])
self.ls.release()
# reset lockset
self._setUpLS()
# ...another thread can share it too
self._addThread(target=self._doLockSet, args=(None, 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
# ...or just share some elements
- self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
+ self._addThread(target=self._doLockSet, args=(["one", "three"], 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
# ...but not add new ones or remove any
- t = self._addThread(target=self._doAddSet, args=(['nine']))
- self._addThread(target=self._doRemoveSet, args=(['two'], ))
+ t = self._addThread(target=self._doAddSet, args=(["nine"]))
+ self._addThread(target=self._doRemoveSet, args=(["two"], ))
self.assertRaises(Queue.Empty, self.done.get_nowait)
# this just releases the set-lock
self.ls.release([])
t.join(60)
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
# release the lock on the actual elements so remove() can proceed too
self.ls.release()
self._waitThreads()
- self.failUnlessEqual(self.done.get_nowait(), ['two'])
+ self.failUnlessEqual(self.done.get_nowait(), ["two"])
# reset lockset
self._setUpLS()
# ...no one can do anything else
self._addThread(target=self._doLockSet, args=(None, 1))
self._addThread(target=self._doLockSet, args=(None, 0))
- self._addThread(target=self._doLockSet, args=(['three'], 0))
- self._addThread(target=self._doLockSet, args=(['two'], 1))
- self._addThread(target=self._doAddSet, args=(['nine']))
+ self._addThread(target=self._doLockSet, args=(["three"], 0))
+ self._addThread(target=self._doLockSet, args=(["two"], 1))
+ self._addThread(target=self._doAddSet, args=(["nine"]))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
for _ in range(5):
- self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), "DONE")
# cleanup
self._setUpLS()
@_Repeat
def testConcurrentSetLockAdd(self):
- self.ls.acquire('one')
+ self.ls.acquire("one")
# Another thread wants the whole SetLock
self._addThread(target=self._doLockSet, args=(None, 0))
self._addThread(target=self._doLockSet, args=(None, 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
- self.assertRaises(AssertionError, self.ls.add, 'four')
+ self.assertRaises(AssertionError, self.ls.add, "four")
self.ls.release()
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self.assertEqual(self.done.get_nowait(), "DONE")
self.ls.acquire(None)
self._addThread(target=self._doLockSet, args=(None, 0))
self._addThread(target=self._doLockSet, args=(None, 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
- self.ls.add('four')
- self.ls.add('five', acquired=1)
- self.ls.add('six', acquired=1, shared=1)
- self.assertEquals(self.ls._list_owned(),
- set(['one', 'two', 'three', 'five', 'six']))
- self.assertEquals(self.ls._is_owned(), True)
+ self.ls.add("four")
+ self.ls.add("five", acquired=1)
+ self.ls.add("six", acquired=1, shared=1)
+ self.assertEquals(self.ls.list_owned(),
+ set(["one", "two", "three", "five", "six"]))
+ self.assertEquals(self.ls.is_owned(), True)
self.assertEquals(self.ls._names(),
- set(['one', 'two', 'three', 'four', 'five', 'six']))
+ set(["one", "two", "three", "four", "five", "six"]))
self.ls.release()
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self.assertEqual(self.done.get_nowait(), "DONE")
self._setUpLS()
@_Repeat
def testEmptyLockSet(self):
# get the set-lock
- self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
+ self.assertEqual(self.ls.acquire(None), set(["one", "two", "three"]))
# now empty it...
- self.ls.remove(['one', 'two', 'three'])
+ self.ls.remove(["one", "two", "three"])
+ self.assertFalse(self.ls._names())
# and adds/locks by another thread still wait
- self._addThread(target=self._doAddSet, args=(['nine']))
+ self._addThread(target=self._doAddSet, args=(["nine"]))
self._addThread(target=self._doLockSet, args=(None, 1))
self._addThread(target=self._doLockSet, args=(None, 0))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
for _ in range(3):
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
# empty it again...
- self.assertEqual(self.ls.remove(['nine']), ['nine'])
+ self.assertEqual(self.ls.remove(["nine"]), ["nine"])
# now share it...
self.assertEqual(self.ls.acquire(None, shared=1), set())
# other sharers can go, adds still wait
self._addThread(target=self._doLockSet, args=(None, 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self._addThread(target=self._doAddSet, args=(['nine']))
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self._addThread(target=self._doAddSet, args=(["nine"]))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
self._setUpLS()
+ def testAcquireWithNamesDowngrade(self):
+ self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
+ self.assertTrue(self.ls.is_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+ self.ls.release()
+ self.assertFalse(self.ls.is_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+ # Can't downgrade after releasing
+ self.assertRaises(AssertionError, self.ls.downgrade, "two")
+
+ def testDowngrade(self):
+ # Not owning anything, must raise an exception
+ self.assertFalse(self.ls.is_owned())
+ self.assertRaises(AssertionError, self.ls.downgrade)
+
+ self.assertFalse(compat.any(i.is_owned()
+ for i in self.ls._get_lockdict().values()))
+ self.assertFalse(self.ls.check_owned(self.ls._names()))
+ for name in self.ls._names():
+ self.assertFalse(self.ls.check_owned(name))
+
+ self.assertEquals(self.ls.acquire(None, shared=0),
+ set(["one", "two", "three"]))
+ self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
+
+ self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+ for name in self.ls._names():
+ self.assertTrue(self.ls.check_owned(name))
+ self.assertTrue(self.ls.check_owned(name, shared=0))
+ self.assertFalse(self.ls.check_owned(name, shared=1))
+
+ self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+ self.assertTrue(compat.all(i.is_owned(shared=0)
+ for i in self.ls._get_lockdict().values()))
+
+ # Start downgrading locks
+ self.assertTrue(self.ls.downgrade(names=["one"]))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+ self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
+ for name, lock in
+ self.ls._get_lockdict().items()))
+
+ self.assertFalse(self.ls.check_owned("one", shared=0))
+ self.assertTrue(self.ls.check_owned("one", shared=1))
+ self.assertTrue(self.ls.check_owned("two", shared=0))
+ self.assertTrue(self.ls.check_owned("three", shared=0))
+
+ # Downgrade second lock
+ self.assertTrue(self.ls.downgrade(names="two"))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+ should_share = lambda name: [0, 1][int(name in ("one", "two"))]
+ self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
+ for name, lock in
+ self.ls._get_lockdict().items()))
+
+ self.assertFalse(self.ls.check_owned("one", shared=0))
+ self.assertTrue(self.ls.check_owned("one", shared=1))
+ self.assertFalse(self.ls.check_owned("two", shared=0))
+ self.assertTrue(self.ls.check_owned("two", shared=1))
+ self.assertTrue(self.ls.check_owned("three", shared=0))
+
+ # Downgrading the last exclusive lock to shared must downgrade the
+ # lockset-internal lock too
+ self.assertTrue(self.ls.downgrade(names="three"))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+ self.assertTrue(compat.all(i.is_owned(shared=1)
+ for i in self.ls._get_lockdict().values()))
+
+ # Verify owned locks
+ for name in self.ls._names():
+ self.assertTrue(self.ls.check_owned(name, shared=1))
+
+ # Downgrading a shared lock must be a no-op
+ self.assertTrue(self.ls.downgrade(names=["one", "three"]))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+ self.assertTrue(compat.all(i.is_owned(shared=1)
+ for i in self.ls._get_lockdict().values()))
-class TestGanetiLockManager(_ThreadedTestCase):
+ self.ls.release()
+
+ def testDowngradeEverything(self):
+ self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0),
+ set(["one", "two", "three"]))
+ self.assertTrue(self.ls.owning_all())
+
+ # Ensure all locks are now owned in exclusive mode
+ for name in self.ls._names():
+ self.assertTrue(self.ls.check_owned(name, shared=0))
+
+ # Downgrade everything
+ self.assertTrue(self.ls.downgrade())
+
+ # Ensure all locks are now owned in shared mode
+ for name in self.ls._names():
+ self.assertTrue(self.ls.check_owned(name, shared=1))
+
+ self.assertTrue(self.ls.owning_all())
+
+ def testPriority(self):
+ def _Acquire(prev, next, name, priority, success_fn):
+ prev.wait()
+ self.assert_(self.ls.acquire(name, shared=0,
+ priority=priority,
+ test_notify=lambda _: next.set()))
+ try:
+ success_fn()
+ finally:
+ self.ls.release()
+
+ # Get all in exclusive mode
+ self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
+
+ done_two = Queue.Queue(0)
+
+ first = threading.Event()
+ prev = first
+
+ acquires = [("one", prio, self.done) for prio in range(1, 33)]
+ acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
+
+ # Use a deterministic random generator
+ random.Random(741).shuffle(acquires)
+
+ for (name, prio, done) in acquires:
+ ev = threading.Event()
+ self._addThread(target=_Acquire,
+ args=(prev, ev, name, prio,
+ compat.partial(done.put, "Prio%s" % prio)))
+ prev = ev
+
+ # Start acquires
+ first.set()
+
+ # Wait for last acquire to start
+ prev.wait()
+
+ # Let threads acquire locks
+ self.ls.release()
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ for i in range(1, 33):
+ self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
+ self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
+
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.assertRaises(Queue.Empty, done_two.get_nowait)
+
+ def testNamesWithOpportunisticAndTimeout(self):
+ self.assertRaises(AssertionError, self.ls.acquire,
+ ["one", "two"], timeout=1.0, opportunistic=True)
+
+ def testOpportunisticWithUnknownName(self):
+ name = "unknown"
+ self.assertFalse(name in self.ls._names())
+ result = self.ls.acquire(name, opportunistic=True)
+ self.assertFalse(result)
+ self.assertFalse(self.ls.list_owned())
+
+ result = self.ls.acquire(["two", name], opportunistic=True)
+ self.assertEqual(result, set(["two"]))
+ self.assertEqual(self.ls.list_owned(), set(["two"]))
+
+ self.ls.release()
+
+ def testSimpleOpportunisticAcquisition(self):
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Hold a lock in main thread
+ self.assertEqual(self.ls.acquire("two", shared=0), set(["two"]))
+
+ def fn():
+ # The lock "two" is held by the main thread
+ result = self.ls.acquire(["one", "two"], shared=0, opportunistic=True)
+ self.assertEqual(result, set(["one"]))
+ self.assertEqual(self.ls.list_owned(), set(["one"]))
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+
+ # Try to acquire the lock held by the main thread
+ result = self.ls.acquire(["two"], shared=0, opportunistic=True)
+ self.assertFalse(self.ls._get_lock().is_owned())
+ self.assertFalse(result)
+ self.assertFalse(self.ls.list_owned())
+
+ # Try to acquire all locks
+ result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True)
+ self.assertTrue(self.ls._get_lock().is_owned(),
+ msg="Internal lock is not owned")
+ self.assertEqual(result, set(["one", "three"]))
+ self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
+
+ self.ls.release()
+
+ self.assertFalse(self.ls.list_owned())
+
+ self.done.put(True)
+
+ self._addThread(target=fn)
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ self.assertEqual(self.ls.list_owned(), set(["two"]))
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertTrue(self.done.get_nowait())
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testOpportunisticAcquisitionWithoutNamesExpires(self):
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Hold all locks in main thread
+ self.ls.acquire(locking.ALL_SET, shared=0)
+ self.assertTrue(self.ls._get_lock().is_owned())
+
+ def fn():
+ # Try to acquire all locks in separate thread
+ result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True,
+ timeout=0.1)
+ self.assertFalse(result)
+ self.assertFalse(self.ls._get_lock().is_owned())
+ self.assertFalse(self.ls.list_owned())
+
+ # Try once more without a timeout
+ self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
+
+ self.done.put(True)
+
+ self._addThread(target=fn)
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned(shared=0))
+
+ self.assertTrue(self.done.get_nowait())
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testSharedOpportunisticAcquisitionWithoutNames(self):
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Hold all locks in main thread
+ self.ls.acquire(locking.ALL_SET, shared=1)
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+ def fn():
+ # Try to acquire all locks in separate thread in shared mode
+ result = self.ls.acquire(locking.ALL_SET, shared=1, opportunistic=True,
+ timeout=0.1)
+ self.assertEqual(result, set(["one", "two", "three"]))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+ self.ls.release()
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ # Try one in exclusive mode
+ self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
+
+ self.done.put(True)
+
+ self._addThread(target=fn)
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertTrue(self.done.get_nowait())
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testLockDeleteWithOpportunisticAcquisition(self):
+ # This test exercises some code handling LockError on acquisition, that is
+ # after all lock names have been gathered. This shouldn't happen in reality
+ # as removing locks from the set requires the lockset-internal lock, but
+ # the code should handle the situation anyway.
+ ready = threading.Event()
+ finished = threading.Event()
+
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Thread function to delete lock
+ def fn():
+ # Wait for notification
+ ready.wait()
+
+ # Delete lock named "two" by accessing lockset-internal data
+ ld = self.ls._get_lockdict()
+ self.assertTrue(ld["two"].delete())
+
+ self.done.put("deleted.two")
+
+ # Notify helper
+ finished.set()
+
+ self._addThread(target=fn)
+
+ # Notification helper, called when lock already holds internal lock.
+ # Therefore only one of the locks not yet locked can be deleted.
+ def notify(name):
+ self.done.put("notify.%s" % name)
+
+ if name == "one":
+ # Tell helper thread to delete lock "two"
+ ready.set()
+ finished.wait()
+
+ # Hold all locks in main thread
+ self.ls.acquire(locking.ALL_SET, shared=0, test_notify=notify)
+ self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ # Release all locks
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertEqual(self.done.get_nowait(), "notify.one")
+ self.assertEqual(self.done.get_nowait(), "deleted.two")
+ self.assertEqual(self.done.get_nowait(), "notify.three")
+ self.assertEqual(self.done.get_nowait(), "notify.two")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+
+class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
+ def setUp(self):
+ self.fn = locking._GetLsAcquireModeAndTimeouts
+
+ def testOpportunisticWithoutNames(self):
+ (mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True)
+ self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+ self.assertTrue(ls_timeout_fn is None)
+ self.assertEqual(timeout_fn(), 0)
+
+ def testAllInputCombinations(self):
+ for want_all in [False, True]:
+ for timeout in [None, 0, 100]:
+ for opportunistic in [False, True]:
+ if (opportunistic and
+ not want_all and
+ timeout is not None):
+ # Can't accept a timeout when acquiring opportunistically
+ self.assertRaises(AssertionError, self.fn,
+ want_all, timeout, opportunistic)
+ else:
+ (mode, ls_timeout_fn, timeout_fn) = \
+ self.fn(want_all, timeout, opportunistic)
+
+ if opportunistic:
+ self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+ self.assertEqual(timeout_fn(), 0)
+ else:
+ self.assertTrue(callable(timeout_fn))
+ if want_all:
+ self.assertEqual(mode, locking._LS_ACQUIRE_ALL)
+ else:
+ self.assertEqual(mode, locking._LS_ACQUIRE_EXACT)
+
+ if want_all:
+ self.assertTrue(callable(ls_timeout_fn))
+ else:
+ self.assertTrue(ls_timeout_fn is None)
+
+
+class TestGanetiLockManager(_ThreadedTestCase):
def setUp(self):
_ThreadedTestCase.setUp(self)
- self.nodes=['n1', 'n2']
- self.instances=['i1', 'i2', 'i3']
- self.GL = locking.GanetiLockManager(nodes=self.nodes,
- instances=self.instances)
- self.done = Queue.Queue(0)
+ self.nodes = ["n1", "n2"]
+ self.nodegroups = ["g1", "g2"]
+ self.instances = ["i1", "i2", "i3"]
+ self.networks = ["net1", "net2", "net3"]
+ self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
+ self.instances, self.networks)
def tearDown(self):
# Don't try this at home...
self.assertEqual(i, locking.LEVELS[i])
def testDoubleGLFails(self):
- self.assertRaises(AssertionError, locking.GanetiLockManager)
+ self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
def testLockNames(self):
- self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
+ set(self.nodegroups))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
+ self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
+ set(self.networks))
def testInitAndResources(self):
locking.GanetiLockManager._instance = None
- self.GL = locking.GanetiLockManager()
- self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.GL = locking.GanetiLockManager([], [], [], [])
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
locking.GanetiLockManager._instance = None
- self.GL = locking.GanetiLockManager(nodes=self.nodes)
- self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
+ set(self.nodegroups))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
locking.GanetiLockManager._instance = None
- self.GL = locking.GanetiLockManager(instances=self.instances)
- self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.GL = locking.GanetiLockManager([], [], self.instances, [])
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
+ locking.GanetiLockManager._instance = None
+ self.GL = locking.GanetiLockManager([], [], [], self.networks)
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
+ set(self.networks))
+
def testAcquireRelease(self):
- self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
- self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
- self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
- self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
- self.GL.release(locking.LEVEL_NODE, ['n2'])
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i1"])
+ self.GL.acquire(locking.LEVEL_NODEGROUP, ["g2"])
+ self.GL.acquire(locking.LEVEL_NODE, ["n1", "n2"], shared=1)
+ self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
+ shared=1))
+ self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
+ self.GL.release(locking.LEVEL_NODE, ["n2"])
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(["n1"]))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
self.GL.release(locking.LEVEL_NODE)
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
+ self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.release(locking.LEVEL_INSTANCE)
self.assertRaises(errors.LockError, self.GL.acquire,
- locking.LEVEL_INSTANCE, ['i5'])
- self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
+ locking.LEVEL_INSTANCE, ["i5"])
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"], shared=1)
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i3"]))
def testAcquireWholeSets(self):
- self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
set(self.instances))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
set(self.instances))
+ self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
+ set(self.nodegroups))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
+ set(self.nodegroups))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
set(self.nodes))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
set(self.nodes))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_NODEGROUP))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_NODE))
self.GL.release(locking.LEVEL_NODE)
+ self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.release(locking.LEVEL_CLUSTER)
def testAcquireWholeAndPartial(self):
- self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.assertFalse(self.GL.owning_all(locking.LEVEL_INSTANCE))
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
set(self.instances))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
set(self.instances))
- self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
- set(['n2']))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
- set(['n2']))
+ self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ["n2"], shared=1),
+ set(["n2"]))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
+ set(["n2"]))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
+ self.assertFalse(self.GL.owning_all(locking.LEVEL_NODE))
self.GL.release(locking.LEVEL_NODE)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.release(locking.LEVEL_CLUSTER)
def testBGLDependency(self):
self.assertRaises(AssertionError, self.GL.acquire,
- locking.LEVEL_NODE, ['n1', 'n2'])
+ locking.LEVEL_NODE, ["n1", "n2"])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_INSTANCE, ["i3"])
self.assertRaises(AssertionError, self.GL.acquire,
- locking.LEVEL_INSTANCE, ['i3'])
- self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
- self.GL.acquire(locking.LEVEL_NODE, ['n1'])
+ locking.LEVEL_NODEGROUP, ["g1"])
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self.GL.acquire(locking.LEVEL_NODE, ["n1"])
self.assertRaises(AssertionError, self.GL.release,
- locking.LEVEL_CLUSTER, ['BGL'])
+ locking.LEVEL_CLUSTER, ["BGL"])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER)
self.GL.release(locking.LEVEL_NODE)
- self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i1", "i2"])
self.assertRaises(AssertionError, self.GL.release,
- locking.LEVEL_CLUSTER, ['BGL'])
+ locking.LEVEL_CLUSTER, ["BGL"])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER)
self.GL.release(locking.LEVEL_INSTANCE)
+ self.GL.acquire(locking.LEVEL_NODEGROUP, None)
+ self.GL.release(locking.LEVEL_NODEGROUP, ["g1"])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER, ["BGL"])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER)
+ self.GL.release(locking.LEVEL_NODEGROUP)
+ self.GL.release(locking.LEVEL_CLUSTER)
def testWrongOrder(self):
- self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
- self.GL.acquire(locking.LEVEL_NODE, ['n2'])
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self.GL.acquire(locking.LEVEL_NODE, ["n2"])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_NODE, ["n1"])
self.assertRaises(AssertionError, self.GL.acquire,
- locking.LEVEL_NODE, ['n1'])
+ locking.LEVEL_NODEGROUP, ["g1"])
self.assertRaises(AssertionError, self.GL.acquire,
- locking.LEVEL_INSTANCE, ['i2'])
+ locking.LEVEL_INSTANCE, ["i2"])
+
+ def testModifiableLevels(self):
+ self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
+ ["BGL2"])
+ self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_NODE_ALLOC,
+ ["NAL2"])
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"])
+ self.GL.add(locking.LEVEL_INSTANCE, ["i4"])
+ self.GL.remove(locking.LEVEL_INSTANCE, ["i3"])
+ self.GL.remove(locking.LEVEL_INSTANCE, ["i1"])
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(["i2", "i4"]))
+ self.GL.add(locking.LEVEL_NODE, ["n3"])
+ self.GL.remove(locking.LEVEL_NODE, ["n1"])
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(["n2", "n3"]))
+ self.GL.add(locking.LEVEL_NODEGROUP, ["g3"])
+ self.GL.remove(locking.LEVEL_NODEGROUP, ["g2"])
+ self.GL.remove(locking.LEVEL_NODEGROUP, ["g1"])
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(["g3"]))
+ self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
+ ["BGL2"])
# Helper function to run as a thread that shared the BGL and then acquires
# some locks at another level.
def _doLock(self, level, names, shared):
try:
- self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.GL.acquire(level, names, shared=shared)
- self.done.put('DONE')
+ self.done.put("DONE")
self.GL.release(level)
self.GL.release(locking.LEVEL_CLUSTER)
except errors.LockError:
- self.done.put('ERR')
+ self.done.put("ERR")
@_Repeat
def testConcurrency(self):
- self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self._addThread(target=self._doLock,
- args=(locking.LEVEL_INSTANCE, 'i1', 1))
+ args=(locking.LEVEL_INSTANCE, "i1", 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"])
self._addThread(target=self._doLock,
- args=(locking.LEVEL_INSTANCE, 'i1', 1))
+ args=(locking.LEVEL_INSTANCE, "i1", 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
self._addThread(target=self._doLock,
- args=(locking.LEVEL_INSTANCE, 'i3', 1))
+ args=(locking.LEVEL_INSTANCE, "i3", 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.GL.release(locking.LEVEL_INSTANCE)
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
- self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i2"], shared=1)
self._addThread(target=self._doLock,
- args=(locking.LEVEL_INSTANCE, 'i2', 1))
+ args=(locking.LEVEL_INSTANCE, "i2", 1))
self._waitThreads()
- self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), "DONE")
self._addThread(target=self._doLock,
- args=(locking.LEVEL_INSTANCE, 'i2', 0))
+ args=(locking.LEVEL_INSTANCE, "i2", 0))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.GL.release(locking.LEVEL_INSTANCE)
self._waitThreads()
- self.assertEqual(self.done.get(True, 1), 'DONE')
- self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
+ self.assertEqual(self.done.get(True, 1), "DONE")
+ self.GL.release(locking.LEVEL_CLUSTER, ["BGL"])
+
+
+class TestLockMonitor(_ThreadedTestCase):
+ def setUp(self):
+ _ThreadedTestCase.setUp(self)
+ self.lm = locking.LockMonitor()
+
+ def testSingleThread(self):
+ locks = []
+
+ for i in range(100):
+ name = "TestLock%s" % i
+ locks.append(locking.SharedLock(name, monitor=self.lm))
+
+ self.assertEqual(len(self.lm._locks), len(locks))
+ result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
+ self.assertEqual(len(result.fields), 1)
+ self.assertEqual(len(result.data), 100)
+
+ # Delete all locks
+ del locks[:]
+
+ # The garbage collector might needs some time
+ def _CheckLocks():
+ if self.lm._locks:
+ raise utils.RetryAgain()
+
+ utils.Retry(_CheckLocks, 0.1, 30.0)
+
+ self.assertFalse(self.lm._locks)
+
+ def testMultiThread(self):
+ locks = []
+
+ def _CreateLock(prev, next, name):
+ prev.wait()
+ locks.append(locking.SharedLock(name, monitor=self.lm))
+ if next:
+ next.set()
+
+ expnames = []
+
+ first = threading.Event()
+ prev = first
+
+ # Use a deterministic random generator
+ for i in random.Random(4263).sample(range(100), 33):
+ name = "MtTestLock%s" % i
+ expnames.append(name)
+
+ ev = threading.Event()
+ self._addThread(target=_CreateLock, args=(prev, ev, name))
+ prev = ev
+
+ # Add locks
+ first.set()
+ self._waitThreads()
+
+ # Check order in which locks were added
+ self.assertEqual([i.name for i in locks], expnames)
+
+ # Check query result
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assert_(isinstance(result, dict))
+ response = objects.QueryResponse.FromDict(result)
+ self.assertEqual(response.data,
+ [[(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, [])]
+ for name in utils.NiceSort(expnames)])
+ self.assertEqual(len(response.fields), 4)
+ self.assertEqual(["name", "mode", "owner", "pending"],
+ [fdef.name for fdef in response.fields])
+
+ # Test exclusive acquire
+ for tlock in locks[::4]:
+ tlock.acquire(shared=0)
+ try:
+ def _GetExpResult(name):
+ if tlock.name == name:
+ return [(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.RS_NORMAL, [])]
+ return [(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, [])]
+
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [_GetExpResult(name)
+ for name in utils.NiceSort(expnames)])
+ finally:
+ tlock.release()
+
+ # Test shared acquire
+ def _Acquire(lock, shared, ev, notify):
+ lock.acquire(shared=shared)
+ try:
+ notify.set()
+ ev.wait()
+ finally:
+ lock.release()
+
+ for tlock1 in locks[::11]:
+ for tlock2 in locks[::-15]:
+ if tlock2 == tlock1:
+ # Avoid deadlocks
+ continue
+
+ for tlock3 in locks[::10]:
+ if tlock3 in (tlock2, tlock1):
+ # Avoid deadlocks
+ continue
+
+ releaseev = threading.Event()
+
+ # Acquire locks
+ acquireev = []
+ tthreads1 = []
+ for i in range(3):
+ ev = threading.Event()
+ tthreads1.append(self._addThread(target=_Acquire,
+ args=(tlock1, 1, releaseev, ev)))
+ acquireev.append(ev)
+
+ ev = threading.Event()
+ tthread2 = self._addThread(target=_Acquire,
+ args=(tlock2, 1, releaseev, ev))
+ acquireev.append(ev)
+
+ ev = threading.Event()
+ tthread3 = self._addThread(target=_Acquire,
+ args=(tlock3, 0, releaseev, ev))
+ acquireev.append(ev)
+
+ # Wait for all locks to be acquired
+ for i in acquireev:
+ i.wait()
+
+ # Check query result
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ response = objects.QueryResponse.FromDict(result)
+ for (name, mode, owner) in response.data:
+ (name_status, name_value) = name
+ (owner_status, owner_value) = owner
+
+ self.assertEqual(name_status, constants.RS_NORMAL)
+ self.assertEqual(owner_status, constants.RS_NORMAL)
+
+ if name_value == tlock1.name:
+ self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
+ self.assertEqual(set(owner_value),
+ set(i.getName() for i in tthreads1))
+ continue
+
+ if name_value == tlock2.name:
+ self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
+ self.assertEqual(owner_value, [tthread2.getName()])
+ continue
+
+ if name_value == tlock3.name:
+ self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
+ self.assertEqual(owner_value, [tthread3.getName()])
+ continue
+
+ self.assert_(name_value in expnames)
+ self.assertEqual(mode, (constants.RS_NORMAL, None))
+ self.assert_(owner_value is None)
+
+ # Release locks again
+ releaseev.set()
+
+ self._waitThreads()
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]
+ for name in utils.NiceSort(expnames)])
+
+ def testDelete(self):
+ lock = locking.SharedLock("TestLock", monitor=self.lm)
+
+ self.assertEqual(len(self.lm._locks), 1)
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]])
+
+ lock.delete()
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, "deleted"),
+ (constants.RS_NORMAL, None)]])
+ self.assertEqual(len(self.lm._locks), 1)
+
+ def testPending(self):
+ def _Acquire(lock, shared, prev, next):
+ prev.wait()
+
+ lock.acquire(shared=shared, test_notify=next.set)
+ try:
+ pass
+ finally:
+ lock.release()
+
+ lock = locking.SharedLock("ExcLock", monitor=self.lm)
+
+ for shared in [0, 1]:
+ lock.acquire()
+ try:
+ self.assertEqual(len(self.lm._locks), 1)
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL,
+ [threading.currentThread().getName()])]])
+
+ threads = []
+
+ first = threading.Event()
+ prev = first
+
+ for i in range(5):
+ ev = threading.Event()
+ threads.append(self._addThread(target=_Acquire,
+ args=(lock, shared, prev, ev)))
+ prev = ev
+
+ # Start acquires
+ first.set()
+
+ # Wait for last acquire to start waiting
+ prev.wait()
+
+ # NOTE: This works only because QueryLocks will acquire the
+ # lock-internal lock again and won't be able to get the information
+ # until it has the lock. By then the acquire should be registered in
+ # SharedLock.__pending (otherwise it's a bug).
+
+ # All acquires are waiting now
+ if shared:
+ pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
+ else:
+ pending = [("exclusive", [t.getName()]) for t in threads]
+
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.RS_NORMAL, pending)]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+ finally:
+ lock.release()
+
+ self._waitThreads()
-if __name__ == '__main__':
- unittest.main()
- #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
- #unittest.TextTestRunner(verbosity=2).run(suite)
+ # No pending acquires
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, [])]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+
+ def testDeleteAndRecreate(self):
+ lname = "TestLock101923193"
+
+ # Create some locks with the same name and keep all references
+ locks = [locking.SharedLock(lname, monitor=self.lm)
+ for _ in range(5)]
+
+ self.assertEqual(len(self.lm._locks), len(locks))
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]] * 5)
+
+ locks[2].delete()
+
+ # Check information order
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]] * 2 +
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, "deleted"),
+ (constants.RS_NORMAL, None)]] +
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]] * 2)
+
+ locks[1].acquire(shared=0)
+
+ last_status = [
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL, [threading.currentThread().getName()])],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, "deleted"),
+ (constants.RS_NORMAL, None)],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)],
+ ]
+
+ # Check information order
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
+
+ self.assertEqual(len(set(self.lm._locks.values())), len(locks))
+ self.assertEqual(len(self.lm._locks), len(locks))
+
+ # Check lock deletion
+ for idx in range(len(locks)):
+ del locks[0]
+ assert gc.isenabled()
+ gc.collect()
+ self.assertEqual(len(self.lm._locks), len(locks))
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ last_status[idx + 1:])
+
+ # All locks should have been deleted
+ assert not locks
+ self.assertFalse(self.lm._locks)
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
+
+ class _FakeLock:
+ def __init__(self):
+ self._info = []
+
+ def AddResult(self, *args):
+ self._info.append(args)
+
+ def CountPending(self):
+ return len(self._info)
+
+ def GetLockInfo(self, requested):
+ (exp_requested, result) = self._info.pop(0)
+
+ if exp_requested != requested:
+ raise Exception("Requested information (%s) does not match"
+ " expectations (%s)" % (requested, exp_requested))
+
+ return result
+
+ def testMultipleResults(self):
+ fl1 = self._FakeLock()
+ fl2 = self._FakeLock()
+
+ self.lm.RegisterLock(fl1)
+ self.lm.RegisterLock(fl2)
+
+ # Empty information
+ for i in [fl1, fl2]:
+ i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
+ for i in [fl1, fl2]:
+ self.assertEqual(i.CountPending(), 0)
+
+ # Check ordering
+ for fn in [lambda x: x, reversed, sorted]:
+ fl1.AddResult(set(), list(fn([
+ ("aaa", None, None, None),
+ ("bbb", None, None, None),
+ ])))
+ fl2.AddResult(set(), [])
+ result = self.lm.QueryLocks(["name"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+ [(constants.RS_NORMAL, "aaa")],
+ [(constants.RS_NORMAL, "bbb")],
+ ])
+ for i in [fl1, fl2]:
+ self.assertEqual(i.CountPending(), 0)
+
+ for fn2 in [lambda x: x, reversed, sorted]:
+ fl1.AddResult(set([query.LQ_MODE]), list(fn([
+ # Same name, but different information
+ ("aaa", "mode0", None, None),
+ ("aaa", "mode1", None, None),
+ ("aaa", "mode2", None, None),
+ ("aaa", "mode3", None, None),
+ ])))
+ fl2.AddResult(set([query.LQ_MODE]), [
+ ("zzz", "end", None, None),
+ ("000", "start", None, None),
+ ] + list(fn2([
+ ("aaa", "b200", None, None),
+ ("aaa", "b300", None, None),
+ ])))
+ result = self.lm.QueryLocks(["name", "mode"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+ [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
+ ] + list(fn([
+ # Name is the same, so order must be equal to incoming order
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
+ ])) + list(fn2([
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
+ ])) + [
+ [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
+ ])
+ for i in [fl1, fl2]:
+ self.assertEqual(i.CountPending(), 0)
+
+
+if __name__ == "__main__":
+ testutils.GanetiTestProgram()