self.cond.acquire()
self.assertEqual(len(self.cond._waiters), 3)
self.assertEqual(self.cond._waiters, set(threads))
+
+ self.assertTrue(repr(self.cond).startswith("<"))
+ self.assertTrue("waiters=" in repr(self.cond))
+
# This new thread can't acquire the lock, and thus call wait, before we
# release it
self._addThread(target=fn)
_ThreadedTestCase.setUp(self)
self.sl = locking.SharedLock("TestSharedLock")
+ self.assertTrue(repr(self.sl).startswith("<"))
+ self.assertTrue("name=TestSharedLock" in repr(self.sl))
+
def testSequenceAndOwnership(self):
self.assertFalse(self.sl.is_owned())
self.sl.acquire(shared=1)
self.assertEqual(self.ls.acquire(None), set(["one", "two", "three"]))
# now empty it...
self.ls.remove(["one", "two", "three"])
+ self.assertFalse(self.ls._names())
# and adds/locks by another thread still wait
self._addThread(target=self._doAddSet, args=(["nine"]))
self._addThread(target=self._doLockSet, args=(None, 1))
def testDowngradeEverything(self):
self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0),
set(["one", "two", "three"]))
+ self.assertTrue(self.ls.owning_all())
# Ensure all locks are now owned in exclusive mode
for name in self.ls._names():
for name in self.ls._names():
self.assertTrue(self.ls.check_owned(name, shared=1))
+ self.assertTrue(self.ls.owning_all())
+
def testPriority(self):
def _Acquire(prev, next, name, priority, success_fn):
prev.wait()
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertRaises(Queue.Empty, done_two.get_nowait)
+ def testNamesWithOpportunisticAndTimeout(self):
+ self.assertRaises(AssertionError, self.ls.acquire,
+ ["one", "two"], timeout=1.0, opportunistic=True)
+
+ def testOpportunisticWithUnknownName(self):
+ name = "unknown"
+ self.assertFalse(name in self.ls._names())
+ result = self.ls.acquire(name, opportunistic=True)
+ self.assertFalse(result)
+ self.assertFalse(self.ls.list_owned())
+
+ result = self.ls.acquire(["two", name], opportunistic=True)
+ self.assertEqual(result, set(["two"]))
+ self.assertEqual(self.ls.list_owned(), set(["two"]))
+
+ self.ls.release()
+
+ def testSimpleOpportunisticAcquisition(self):
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Hold a lock in main thread
+ self.assertEqual(self.ls.acquire("two", shared=0), set(["two"]))
+
+ def fn():
+ # The lock "two" is held by the main thread
+ result = self.ls.acquire(["one", "two"], shared=0, opportunistic=True)
+ self.assertEqual(result, set(["one"]))
+ self.assertEqual(self.ls.list_owned(), set(["one"]))
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+
+ # Try to acquire the lock held by the main thread
+ result = self.ls.acquire(["two"], shared=0, opportunistic=True)
+ self.assertFalse(self.ls._get_lock().is_owned())
+ self.assertFalse(result)
+ self.assertFalse(self.ls.list_owned())
+
+ # Try to acquire all locks
+ result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True)
+ self.assertTrue(self.ls._get_lock().is_owned(),
+ msg="Internal lock is not owned")
+ self.assertEqual(result, set(["one", "three"]))
+ self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
+
+ self.ls.release()
+
+ self.assertFalse(self.ls.list_owned())
+
+ self.done.put(True)
+
+ self._addThread(target=fn)
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ self.assertEqual(self.ls.list_owned(), set(["two"]))
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertTrue(self.done.get_nowait())
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testOpportunisticAcquisitionWithoutNamesExpires(self):
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Hold all locks in main thread
+ self.ls.acquire(locking.ALL_SET, shared=0)
+ self.assertTrue(self.ls._get_lock().is_owned())
+
+ def fn():
+ # Try to acquire all locks in separate thread
+ result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True,
+ timeout=0.1)
+ self.assertFalse(result)
+ self.assertFalse(self.ls._get_lock().is_owned())
+ self.assertFalse(self.ls.list_owned())
+
+ # Try once more without a timeout
+ self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
+
+ self.done.put(True)
+
+ self._addThread(target=fn)
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned(shared=0))
+
+ self.assertTrue(self.done.get_nowait())
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testSharedOpportunisticAcquisitionWithoutNames(self):
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Hold all locks in main thread
+ self.ls.acquire(locking.ALL_SET, shared=1)
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+
+ def fn():
+ # Try to acquire all locks in separate thread in shared mode
+ result = self.ls.acquire(locking.ALL_SET, shared=1, opportunistic=True,
+ timeout=0.1)
+ self.assertEqual(result, set(["one", "two", "three"]))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+ self.ls.release()
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ # Try one in exclusive mode
+ self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
+
+ self.done.put(True)
+
+ self._addThread(target=fn)
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertTrue(self.done.get_nowait())
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testLockDeleteWithOpportunisticAcquisition(self):
+ # This test exercises some code handling LockError on acquisition, that is
+ # after all lock names have been gathered. This shouldn't happen in reality
+ # as removing locks from the set requires the lockset-internal lock, but
+ # the code should handle the situation anyway.
+ ready = threading.Event()
+ finished = threading.Event()
+
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Thread function to delete lock
+ def fn():
+ # Wait for notification
+ ready.wait()
+
+ # Delete lock named "two" by accessing lockset-internal data
+ ld = self.ls._get_lockdict()
+ self.assertTrue(ld["two"].delete())
+
+ self.done.put("deleted.two")
+
+ # Notify helper
+ finished.set()
+
+ self._addThread(target=fn)
+
+ # Notification helper, called when lock already holds internal lock.
+ # Therefore only one of the locks not yet locked can be deleted.
+ def notify(name):
+ self.done.put("notify.%s" % name)
+
+ if name == "one":
+ # Tell helper thread to delete lock "two"
+ ready.set()
+ finished.wait()
+
+ # Hold all locks in main thread
+ self.ls.acquire(locking.ALL_SET, shared=0, test_notify=notify)
+ self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ # Release all locks
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertEqual(self.done.get_nowait(), "notify.one")
+ self.assertEqual(self.done.get_nowait(), "deleted.two")
+ self.assertEqual(self.done.get_nowait(), "notify.three")
+ self.assertEqual(self.done.get_nowait(), "notify.two")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+
+class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
+ def setUp(self):
+ self.fn = locking._GetLsAcquireModeAndTimeouts
+
+ def testOpportunisticWithoutNames(self):
+ (mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True)
+ self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+ self.assertTrue(ls_timeout_fn is None)
+ self.assertEqual(timeout_fn(), 0)
+
+ def testAllInputCombinations(self):
+ for want_all in [False, True]:
+ for timeout in [None, 0, 100]:
+ for opportunistic in [False, True]:
+ if (opportunistic and
+ not want_all and
+ timeout is not None):
+ # Can't accept a timeout when acquiring opportunistically
+ self.assertRaises(AssertionError, self.fn,
+ want_all, timeout, opportunistic)
+ else:
+ (mode, ls_timeout_fn, timeout_fn) = \
+ self.fn(want_all, timeout, opportunistic)
+
+ if opportunistic:
+ self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+ self.assertEqual(timeout_fn(), 0)
+ else:
+ self.assertTrue(callable(timeout_fn))
+ if want_all:
+ self.assertEqual(mode, locking._LS_ACQUIRE_ALL)
+ else:
+ self.assertEqual(mode, locking._LS_ACQUIRE_EXACT)
+
+ if want_all:
+ self.assertTrue(callable(ls_timeout_fn))
+ else:
+ self.assertTrue(ls_timeout_fn is None)
+
class TestGanetiLockManager(_ThreadedTestCase):
def setUp(self):
def testLockNames(self):
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
set(self.nodegroups))
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager([], [], [], [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
set(self.nodegroups))
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager([], [], self.instances, [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager([], [], [], self.networks)
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
set(self.nodes))
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
set(self.nodes))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_NODEGROUP))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_NODE))
self.GL.release(locking.LEVEL_NODE)
self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.release(locking.LEVEL_CLUSTER)
def testAcquireWholeAndPartial(self):
+ self.assertFalse(self.GL.owning_all(locking.LEVEL_INSTANCE))
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
set(self.instances))
set(["n2"]))
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
set(["n2"]))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
+ self.assertFalse(self.GL.owning_all(locking.LEVEL_NODE))
self.GL.release(locking.LEVEL_NODE)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.release(locking.LEVEL_CLUSTER)
def testModifiableLevels(self):
self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
["BGL2"])
+ self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_NODE_ALLOC,
+ ["NAL2"])
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"])
self.GL.add(locking.LEVEL_INSTANCE, ["i4"])
self.GL.remove(locking.LEVEL_INSTANCE, ["i3"])