+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertEqual(self.done.get_nowait(), "notify.one")
+ self.assertEqual(self.done.get_nowait(), "deleted.two")
+ self.assertEqual(self.done.get_nowait(), "notify.three")
+ self.assertEqual(self.done.get_nowait(), "notify.two")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+
+class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
+ def setUp(self):
+ self.fn = locking._GetLsAcquireModeAndTimeouts
+
+ def testOpportunisticWithoutNames(self):
+ (mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True)
+ self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+ self.assertTrue(ls_timeout_fn is None)
+ self.assertEqual(timeout_fn(), 0)
+
+ def testAllInputCombinations(self):
+ for want_all in [False, True]:
+ for timeout in [None, 0, 100]:
+ for opportunistic in [False, True]:
+ if (opportunistic and
+ not want_all and
+ timeout is not None):
+ # Can't accept a timeout when acquiring opportunistically
+ self.assertRaises(AssertionError, self.fn,
+ want_all, timeout, opportunistic)
+ else:
+ (mode, ls_timeout_fn, timeout_fn) = \
+ self.fn(want_all, timeout, opportunistic)
+
+ if opportunistic:
+ self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+ self.assertEqual(timeout_fn(), 0)
+ else:
+ self.assertTrue(callable(timeout_fn))
+ if want_all:
+ self.assertEqual(mode, locking._LS_ACQUIRE_ALL)
+ else:
+ self.assertEqual(mode, locking._LS_ACQUIRE_EXACT)
+
+ if want_all:
+ self.assertTrue(callable(ls_timeout_fn))
+ else:
+ self.assertTrue(ls_timeout_fn is None)
+
+
+class TestGanetiLockManager(_ThreadedTestCase):
+ def setUp(self):
+ _ThreadedTestCase.setUp(self)
+ self.nodes = ["n1", "n2"]
+ self.nodegroups = ["g1", "g2"]
+ self.instances = ["i1", "i2", "i3"]
+ self.networks = ["net1", "net2", "net3"]
+ self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
+ self.instances, self.networks)
+
+ def tearDown(self):
+ # Don't try this at home...
+ locking.GanetiLockManager._instance = None
+
+ def testLockingConstants(self):
+ # The locking library internally cheats by assuming its constants have some
+ # relationships with each other. Check those hold true.
+ # This relationship is also used in the Processor to recursively acquire
+ # the right locks. Again, please don't break it.
+ for i in range(len(locking.LEVELS)):
+ self.assertEqual(i, locking.LEVELS[i])
+
+ def testDoubleGLFails(self):
+ self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
+
+ def testLockNames(self):
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
+ set(self.nodegroups))
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
+ set(self.instances))
+ self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
+ set(self.networks))
+
+ def testInitAndResources(self):
+ locking.GanetiLockManager._instance = None
+ self.GL = locking.GanetiLockManager([], [], [], [])
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
+
+ locking.GanetiLockManager._instance = None
+ self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
+ set(self.nodegroups))
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
+
+ locking.GanetiLockManager._instance = None
+ self.GL = locking.GanetiLockManager([], [], self.instances, [])
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
+ set(self.instances))
+
+ locking.GanetiLockManager._instance = None
+ self.GL = locking.GanetiLockManager([], [], [], self.networks)
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
+ set(self.networks))
+
+ def testAcquireRelease(self):
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(["BGL"]))
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i1"])
+ self.GL.acquire(locking.LEVEL_NODEGROUP, ["g2"])
+ self.GL.acquire(locking.LEVEL_NODE, ["n1", "n2"], shared=1)
+ self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
+ shared=1))
+ self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
+ self.GL.release(locking.LEVEL_NODE, ["n2"])
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(["n1"]))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
+ self.GL.release(locking.LEVEL_NODE)
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
+ self.GL.release(locking.LEVEL_NODEGROUP)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self.assertRaises(errors.LockError, self.GL.acquire,
+ locking.LEVEL_INSTANCE, ["i5"])
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"], shared=1)
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i3"]))
+
+ def testAcquireWholeSets(self):
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
+ set(self.instances))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
+ set(self.instances))
+ self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
+ set(self.nodegroups))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
+ set(self.nodegroups))
+ self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
+ set(self.nodes))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
+ set(self.nodes))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_NODEGROUP))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_NODE))
+ self.GL.release(locking.LEVEL_NODE)
+ self.GL.release(locking.LEVEL_NODEGROUP)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self.GL.release(locking.LEVEL_CLUSTER)
+
+ def testAcquireWholeAndPartial(self):
+ self.assertFalse(self.GL.owning_all(locking.LEVEL_INSTANCE))
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
+ set(self.instances))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
+ set(self.instances))
+ self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ["n2"], shared=1),
+ set(["n2"]))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
+ set(["n2"]))
+ self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
+ self.assertFalse(self.GL.owning_all(locking.LEVEL_NODE))
+ self.GL.release(locking.LEVEL_NODE)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self.GL.release(locking.LEVEL_CLUSTER)
+
+ def testBGLDependency(self):
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_NODE, ["n1", "n2"])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_INSTANCE, ["i3"])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_NODEGROUP, ["g1"])
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self.GL.acquire(locking.LEVEL_NODE, ["n1"])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER, ["BGL"])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER)
+ self.GL.release(locking.LEVEL_NODE)
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i1", "i2"])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER, ["BGL"])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self.GL.acquire(locking.LEVEL_NODEGROUP, None)
+ self.GL.release(locking.LEVEL_NODEGROUP, ["g1"])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER, ["BGL"])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER)
+ self.GL.release(locking.LEVEL_NODEGROUP)
+ self.GL.release(locking.LEVEL_CLUSTER)
+
+ def testWrongOrder(self):
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self.GL.acquire(locking.LEVEL_NODE, ["n2"])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_NODE, ["n1"])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_NODEGROUP, ["g1"])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_INSTANCE, ["i2"])
+
+ def testModifiableLevels(self):
+ self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
+ ["BGL2"])
+ self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_NODE_ALLOC,
+ ["NAL2"])
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"])
+ self.GL.add(locking.LEVEL_INSTANCE, ["i4"])
+ self.GL.remove(locking.LEVEL_INSTANCE, ["i3"])
+ self.GL.remove(locking.LEVEL_INSTANCE, ["i1"])
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(["i2", "i4"]))
+ self.GL.add(locking.LEVEL_NODE, ["n3"])
+ self.GL.remove(locking.LEVEL_NODE, ["n1"])
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(["n2", "n3"]))
+ self.GL.add(locking.LEVEL_NODEGROUP, ["g3"])
+ self.GL.remove(locking.LEVEL_NODEGROUP, ["g2"])
+ self.GL.remove(locking.LEVEL_NODEGROUP, ["g1"])
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(["g3"]))
+ self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
+ ["BGL2"])
+
+ # Helper function to run as a thread that shared the BGL and then acquires
+ # some locks at another level.
+ def _doLock(self, level, names, shared):
+ try:
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self.GL.acquire(level, names, shared=shared)
+ self.done.put("DONE")
+ self.GL.release(level)
+ self.GL.release(locking.LEVEL_CLUSTER)
+ except errors.LockError:
+ self.done.put("ERR")
+
+ @_Repeat
+ def testConcurrency(self):
+ self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, "i1", 1))
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"])
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, "i1", 1))
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, "i3", 1))
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self.GL.acquire(locking.LEVEL_INSTANCE, ["i2"], shared=1)
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, "i2", 1))
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), "DONE")
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, "i2", 0))
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self._waitThreads()
+ self.assertEqual(self.done.get(True, 1), "DONE")
+ self.GL.release(locking.LEVEL_CLUSTER, ["BGL"])
+
+
+class TestLockMonitor(_ThreadedTestCase):
+ def setUp(self):
+ _ThreadedTestCase.setUp(self)
+ self.lm = locking.LockMonitor()
+
+ def testSingleThread(self):
+ locks = []
+
+ for i in range(100):
+ name = "TestLock%s" % i
+ locks.append(locking.SharedLock(name, monitor=self.lm))
+
+ self.assertEqual(len(self.lm._locks), len(locks))
+ result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
+ self.assertEqual(len(result.fields), 1)
+ self.assertEqual(len(result.data), 100)
+
+ # Delete all locks
+ del locks[:]
+
+ # The garbage collector might needs some time
+ def _CheckLocks():
+ if self.lm._locks:
+ raise utils.RetryAgain()
+
+ utils.Retry(_CheckLocks, 0.1, 30.0)
+
+ self.assertFalse(self.lm._locks)
+
+ def testMultiThread(self):
+ locks = []
+
+ def _CreateLock(prev, next, name):
+ prev.wait()
+ locks.append(locking.SharedLock(name, monitor=self.lm))
+ if next:
+ next.set()
+
+ expnames = []
+
+ first = threading.Event()
+ prev = first
+
+ # Use a deterministic random generator
+ for i in random.Random(4263).sample(range(100), 33):
+ name = "MtTestLock%s" % i
+ expnames.append(name)
+
+ ev = threading.Event()
+ self._addThread(target=_CreateLock, args=(prev, ev, name))
+ prev = ev
+
+ # Add locks
+ first.set()
+ self._waitThreads()
+
+ # Check order in which locks were added
+ self.assertEqual([i.name for i in locks], expnames)
+
+ # Check query result
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assert_(isinstance(result, dict))
+ response = objects.QueryResponse.FromDict(result)
+ self.assertEqual(response.data,
+ [[(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, [])]
+ for name in utils.NiceSort(expnames)])
+ self.assertEqual(len(response.fields), 4)
+ self.assertEqual(["name", "mode", "owner", "pending"],
+ [fdef.name for fdef in response.fields])
+
+ # Test exclusive acquire
+ for tlock in locks[::4]:
+ tlock.acquire(shared=0)
+ try:
+ def _GetExpResult(name):
+ if tlock.name == name:
+ return [(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.RS_NORMAL, [])]
+ return [(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, [])]
+
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [_GetExpResult(name)
+ for name in utils.NiceSort(expnames)])
+ finally:
+ tlock.release()
+
+ # Test shared acquire
+ def _Acquire(lock, shared, ev, notify):
+ lock.acquire(shared=shared)
+ try:
+ notify.set()
+ ev.wait()
+ finally:
+ lock.release()
+
+ for tlock1 in locks[::11]:
+ for tlock2 in locks[::-15]:
+ if tlock2 == tlock1:
+ # Avoid deadlocks
+ continue
+
+ for tlock3 in locks[::10]:
+ if tlock3 in (tlock2, tlock1):
+ # Avoid deadlocks
+ continue
+
+ releaseev = threading.Event()
+
+ # Acquire locks
+ acquireev = []
+ tthreads1 = []
+ for i in range(3):
+ ev = threading.Event()
+ tthreads1.append(self._addThread(target=_Acquire,
+ args=(tlock1, 1, releaseev, ev)))
+ acquireev.append(ev)
+
+ ev = threading.Event()
+ tthread2 = self._addThread(target=_Acquire,
+ args=(tlock2, 1, releaseev, ev))
+ acquireev.append(ev)
+
+ ev = threading.Event()
+ tthread3 = self._addThread(target=_Acquire,
+ args=(tlock3, 0, releaseev, ev))
+ acquireev.append(ev)
+
+ # Wait for all locks to be acquired
+ for i in acquireev:
+ i.wait()
+
+ # Check query result
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ response = objects.QueryResponse.FromDict(result)
+ for (name, mode, owner) in response.data:
+ (name_status, name_value) = name
+ (owner_status, owner_value) = owner
+
+ self.assertEqual(name_status, constants.RS_NORMAL)
+ self.assertEqual(owner_status, constants.RS_NORMAL)
+
+ if name_value == tlock1.name:
+ self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
+ self.assertEqual(set(owner_value),
+ set(i.getName() for i in tthreads1))
+ continue
+
+ if name_value == tlock2.name:
+ self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
+ self.assertEqual(owner_value, [tthread2.getName()])
+ continue
+
+ if name_value == tlock3.name:
+ self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
+ self.assertEqual(owner_value, [tthread3.getName()])
+ continue
+
+ self.assert_(name_value in expnames)
+ self.assertEqual(mode, (constants.RS_NORMAL, None))
+ self.assert_(owner_value is None)
+
+ # Release locks again
+ releaseev.set()
+
+ self._waitThreads()
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]
+ for name in utils.NiceSort(expnames)])
+
+ def testDelete(self):
+ lock = locking.SharedLock("TestLock", monitor=self.lm)
+
+ self.assertEqual(len(self.lm._locks), 1)
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]])
+
+ lock.delete()
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, "deleted"),
+ (constants.RS_NORMAL, None)]])
+ self.assertEqual(len(self.lm._locks), 1)
+
+ def testPending(self):
+ def _Acquire(lock, shared, prev, next):
+ prev.wait()
+
+ lock.acquire(shared=shared, test_notify=next.set)
+ try:
+ pass
+ finally:
+ lock.release()
+
+ lock = locking.SharedLock("ExcLock", monitor=self.lm)
+
+ for shared in [0, 1]:
+ lock.acquire()
+ try:
+ self.assertEqual(len(self.lm._locks), 1)
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL,
+ [threading.currentThread().getName()])]])
+
+ threads = []
+
+ first = threading.Event()
+ prev = first
+
+ for i in range(5):
+ ev = threading.Event()
+ threads.append(self._addThread(target=_Acquire,
+ args=(lock, shared, prev, ev)))
+ prev = ev
+
+ # Start acquires
+ first.set()
+
+ # Wait for last acquire to start waiting
+ prev.wait()
+
+ # NOTE: This works only because QueryLocks will acquire the
+ # lock-internal lock again and won't be able to get the information
+ # until it has the lock. By then the acquire should be registered in
+ # SharedLock.__pending (otherwise it's a bug).
+
+ # All acquires are waiting now
+ if shared:
+ pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
+ else:
+ pending = [("exclusive", [t.getName()]) for t in threads]
+
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.RS_NORMAL, pending)]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+ finally:
+ lock.release()
+
+ self._waitThreads()
+
+ # No pending acquires
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, [])]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+
+ def testDeleteAndRecreate(self):
+ lname = "TestLock101923193"
+
+ # Create some locks with the same name and keep all references
+ locks = [locking.SharedLock(lname, monitor=self.lm)
+ for _ in range(5)]
+
+ self.assertEqual(len(self.lm._locks), len(locks))
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]] * 5)
+
+ locks[2].delete()
+
+ # Check information order
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]] * 2 +
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, "deleted"),
+ (constants.RS_NORMAL, None)]] +
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]] * 2)
+
+ locks[1].acquire(shared=0)
+
+ last_status = [
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL, [threading.currentThread().getName()])],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, "deleted"),
+ (constants.RS_NORMAL, None)],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)],
+ ]
+
+ # Check information order
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
+
+ self.assertEqual(len(set(self.lm._locks.values())), len(locks))
+ self.assertEqual(len(self.lm._locks), len(locks))
+
+ # Check lock deletion
+ for idx in range(len(locks)):
+ del locks[0]
+ assert gc.isenabled()
+ gc.collect()
+ self.assertEqual(len(self.lm._locks), len(locks))
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ last_status[idx + 1:])
+
+ # All locks should have been deleted
+ assert not locks
+ self.assertFalse(self.lm._locks)
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
+
+ class _FakeLock:
+ def __init__(self):
+ self._info = []
+
+ def AddResult(self, *args):
+ self._info.append(args)
+
+ def CountPending(self):
+ return len(self._info)
+
+ def GetLockInfo(self, requested):
+ (exp_requested, result) = self._info.pop(0)
+
+ if exp_requested != requested:
+ raise Exception("Requested information (%s) does not match"
+ " expectations (%s)" % (requested, exp_requested))
+
+ return result
+
+ def testMultipleResults(self):
+ fl1 = self._FakeLock()
+ fl2 = self._FakeLock()
+
+ self.lm.RegisterLock(fl1)
+ self.lm.RegisterLock(fl2)
+
+ # Empty information
+ for i in [fl1, fl2]:
+ i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
+ for i in [fl1, fl2]:
+ self.assertEqual(i.CountPending(), 0)
+
+ # Check ordering
+ for fn in [lambda x: x, reversed, sorted]:
+ fl1.AddResult(set(), list(fn([
+ ("aaa", None, None, None),
+ ("bbb", None, None, None),
+ ])))
+ fl2.AddResult(set(), [])
+ result = self.lm.QueryLocks(["name"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+ [(constants.RS_NORMAL, "aaa")],
+ [(constants.RS_NORMAL, "bbb")],
+ ])
+ for i in [fl1, fl2]:
+ self.assertEqual(i.CountPending(), 0)
+
+ for fn2 in [lambda x: x, reversed, sorted]:
+ fl1.AddResult(set([query.LQ_MODE]), list(fn([
+ # Same name, but different information
+ ("aaa", "mode0", None, None),
+ ("aaa", "mode1", None, None),
+ ("aaa", "mode2", None, None),
+ ("aaa", "mode3", None, None),
+ ])))
+ fl2.AddResult(set([query.LQ_MODE]), [
+ ("zzz", "end", None, None),
+ ("000", "start", None, None),
+ ] + list(fn2([
+ ("aaa", "b200", None, None),
+ ("aaa", "b300", None, None),
+ ])))
+ result = self.lm.QueryLocks(["name", "mode"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+ [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
+ ] + list(fn([
+ # Name is the same, so order must be equal to incoming order
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
+ ])) + list(fn2([
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
+ ])) + [
+ [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
+ ])
+ for i in [fl1, fl2]:
+ self.assertEqual(i.CountPending(), 0)