Revision 44b4eddc
b/lib/locking.py | ||
---|---|---|
434 | 434 |
|
435 | 435 |
# Register with lock monitor |
436 | 436 |
if monitor: |
437 |
logging.debug("Adding lock %s to monitor", name) |
|
437 | 438 |
monitor.RegisterLock(self) |
438 | 439 |
|
439 |
def GetInfo(self, requested): |
|
440 |
def GetLockInfo(self, requested):
|
|
440 | 441 |
"""Retrieves information for querying locks. |
441 | 442 |
|
442 | 443 |
@type requested: set |
... | ... | |
489 | 490 |
else: |
490 | 491 |
pending = None |
491 | 492 |
|
492 |
return (self.name, mode, owner_names, pending)
|
|
493 |
return [(self.name, mode, owner_names, pending)]
|
|
493 | 494 |
finally: |
494 | 495 |
self.__lock.release() |
495 | 496 |
|
... | ... | |
1638 | 1639 |
return self.__keyring[level].remove(names) |
1639 | 1640 |
|
1640 | 1641 |
|
1641 |
def _MonitorSortKey((num, item)):
|
|
1642 |
def _MonitorSortKey((item, idx, num)):
|
|
1642 | 1643 |
"""Sorting key function. |
1643 | 1644 |
|
1644 |
Sort by name, then by incoming order. |
|
1645 |
Sort by name, registration order and then order of information. This provides |
|
1646 |
a stable sort order over different providers, even if they return the same |
|
1647 |
name. |
|
1645 | 1648 |
|
1646 | 1649 |
""" |
1647 | 1650 |
(name, _, _, _) = item |
1648 | 1651 |
|
1649 |
return (utils.NiceSortKey(name), num) |
|
1652 |
return (utils.NiceSortKey(name), num, idx)
|
|
1650 | 1653 |
|
1651 | 1654 |
|
1652 | 1655 |
class LockMonitor(object): |
... | ... | |
1666 | 1669 |
self._locks = weakref.WeakKeyDictionary() |
1667 | 1670 |
|
1668 | 1671 |
@ssynchronized(_LOCK_ATTR) |
1669 |
def RegisterLock(self, lock):
|
|
1672 |
def RegisterLock(self, provider):
|
|
1670 | 1673 |
"""Registers a new lock. |
1671 | 1674 |
|
1675 |
@param provider: Object with a callable method named C{GetLockInfo}, taking |
|
1676 |
a single C{set} containing the requested information items |
|
1677 |
@note: It would be nicer to only receive the function generating the |
|
1678 |
requested information but, as it turns out, weak references to bound |
|
1679 |
methods (e.g. C{self.GetLockInfo}) are tricky; there are several |
|
1680 |
workarounds, but none of the ones I found works properly in combination |
|
1681 |
with a standard C{WeakKeyDictionary} |
|
1682 |
|
|
1672 | 1683 |
""" |
1673 |
logging.debug("Registering lock %s", lock.name) |
|
1674 |
assert lock not in self._locks, "Duplicate lock registration" |
|
1684 |
assert provider not in self._locks, "Duplicate registration" |
|
1675 | 1685 |
|
1676 | 1686 |
# There used to be a check for duplicate names here. As it turned out, when |
1677 | 1687 |
# a lock is re-created with the same name in a very short timeframe, the |
... | ... | |
1679 | 1689 |
# By keeping track of the order of incoming registrations, a stable sort |
1680 | 1690 |
# ordering can still be guaranteed. |
1681 | 1691 |
|
1682 |
self._locks[lock] = self._counter.next()
|
|
1692 |
self._locks[provider] = self._counter.next()
|
|
1683 | 1693 |
|
1684 |
@ssynchronized(_LOCK_ATTR) |
|
1685 | 1694 |
def _GetLockInfo(self, requested): |
1686 |
"""Get information from all locks while the monitor lock is held.
|
|
1695 |
"""Get information from all locks. |
|
1687 | 1696 |
|
1688 | 1697 |
""" |
1689 |
return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()] |
|
1698 |
# Must hold lock while getting consistent list of tracked items |
|
1699 |
self._lock.acquire(shared=1) |
|
1700 |
try: |
|
1701 |
items = self._locks.items() |
|
1702 |
finally: |
|
1703 |
self._lock.release() |
|
1704 |
|
|
1705 |
return [(info, idx, num) |
|
1706 |
for (provider, num) in items |
|
1707 |
for (idx, info) in enumerate(provider.GetLockInfo(requested))] |
|
1690 | 1708 |
|
1691 | 1709 |
def _Query(self, fields): |
1692 | 1710 |
"""Queries information from all locks. |
... | ... | |
1703 | 1721 |
key=_MonitorSortKey) |
1704 | 1722 |
|
1705 | 1723 |
# Extract lock information and build query data |
1706 |
return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo)))
|
|
1724 |
return (qobj, query.LockQueryData(map(operator.itemgetter(0), lockinfo)))
|
|
1707 | 1725 |
|
1708 | 1726 |
def QueryLocks(self, fields): |
1709 | 1727 |
"""Queries information from all locks. |
b/test/ganeti.locking_unittest.py | ||
---|---|---|
689 | 689 |
ev.wait() |
690 | 690 |
|
691 | 691 |
# Check lock information |
692 |
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])), |
|
693 |
(self.sl.name, "exclusive", [th_excl1.getName()], None))
|
|
694 |
(_, _, _, pending) = self.sl.GetInfo(set([query.LQ_PENDING]))
|
|
692 |
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
|
|
693 |
[(self.sl.name, "exclusive", [th_excl1.getName()], None)])
|
|
694 |
[(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
|
|
695 | 695 |
self.assertEqual([(pendmode, sorted(waiting)) |
696 | 696 |
for (pendmode, waiting) in pending], |
697 | 697 |
[("exclusive", [th_excl2.getName()]), |
... | ... | |
705 | 705 |
ev.wait() |
706 | 706 |
|
707 | 707 |
# Check lock information again |
708 |
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_PENDING])), |
|
709 |
(self.sl.name, "shared", None, |
|
710 |
[("exclusive", [th_excl2.getName()])])) |
|
711 |
(_, _, owner, _) = self.sl.GetInfo(set([query.LQ_OWNER])) |
|
708 |
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, |
|
709 |
query.LQ_PENDING])), |
|
710 |
[(self.sl.name, "shared", None, |
|
711 |
[("exclusive", [th_excl2.getName()])])]) |
|
712 |
[(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER])) |
|
712 | 713 |
self.assertEqual(set(owner), set([th_excl1.getName()] + |
713 | 714 |
[th.getName() for th in th_shared])) |
714 | 715 |
|
... | ... | |
718 | 719 |
|
719 | 720 |
self._waitThreads() |
720 | 721 |
|
721 |
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER, |
|
722 |
query.LQ_PENDING])), |
|
723 |
(self.sl.name, None, None, []))
|
|
722 |
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
|
|
723 |
query.LQ_PENDING])),
|
|
724 |
[(self.sl.name, None, None, [])])
|
|
724 | 725 |
|
725 | 726 |
@_Repeat |
726 | 727 |
def testMixedAcquireTimeout(self): |
... | ... | |
887 | 888 |
prev.wait() |
888 | 889 |
|
889 | 890 |
# Check lock information |
890 |
self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None)) |
|
891 |
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])), |
|
892 |
(self.sl.name, "exclusive", |
|
893 |
[threading.currentThread().getName()], None)) |
|
891 |
self.assertEqual(self.sl.GetLockInfo(set()), |
|
892 |
[(self.sl.name, None, None, None)]) |
|
893 |
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])), |
|
894 |
[(self.sl.name, "exclusive", |
|
895 |
[threading.currentThread().getName()], None)]) |
|
894 | 896 |
|
895 |
self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio) |
|
897 |
self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])), |
|
898 |
perprio) |
|
896 | 899 |
|
897 | 900 |
# Let threads acquire the lock |
898 | 901 |
self.sl.release() |
... | ... | |
913 | 916 |
|
914 | 917 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
915 | 918 |
|
916 |
def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
|
|
919 |
def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
|
|
917 | 920 |
self.assertEqual(name, self.sl.name) |
918 | 921 |
self.assert_(mode is None) |
919 | 922 |
self.assert_(owner is None) |
... | ... | |
2154 | 2157 |
result = self.lm.QueryLocks(["name", "mode", "owner"]) |
2155 | 2158 |
self.assertEqual(objects.QueryResponse.FromDict(result).data, []) |
2156 | 2159 |
|
2160 |
class _FakeLock: |
|
2161 |
def __init__(self): |
|
2162 |
self._info = [] |
|
2163 |
|
|
2164 |
def AddResult(self, *args): |
|
2165 |
self._info.append(args) |
|
2166 |
|
|
2167 |
def CountPending(self): |
|
2168 |
return len(self._info) |
|
2169 |
|
|
2170 |
def GetLockInfo(self, requested): |
|
2171 |
(exp_requested, result) = self._info.pop(0) |
|
2172 |
|
|
2173 |
if exp_requested != requested: |
|
2174 |
raise Exception("Requested information (%s) does not match" |
|
2175 |
" expectations (%s)" % (requested, exp_requested)) |
|
2176 |
|
|
2177 |
return result |
|
2178 |
|
|
2179 |
def testMultipleResults(self): |
|
2180 |
fl1 = self._FakeLock() |
|
2181 |
fl2 = self._FakeLock() |
|
2182 |
|
|
2183 |
self.lm.RegisterLock(fl1) |
|
2184 |
self.lm.RegisterLock(fl2) |
|
2185 |
|
|
2186 |
# Empty information |
|
2187 |
for i in [fl1, fl2]: |
|
2188 |
i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), []) |
|
2189 |
result = self.lm.QueryLocks(["name", "mode", "owner"]) |
|
2190 |
self.assertEqual(objects.QueryResponse.FromDict(result).data, []) |
|
2191 |
for i in [fl1, fl2]: |
|
2192 |
self.assertEqual(i.CountPending(), 0) |
|
2193 |
|
|
2194 |
# Check ordering |
|
2195 |
for fn in [lambda x: x, reversed, sorted]: |
|
2196 |
fl1.AddResult(set(), list(fn([ |
|
2197 |
("aaa", None, None, None), |
|
2198 |
("bbb", None, None, None), |
|
2199 |
]))) |
|
2200 |
fl2.AddResult(set(), []) |
|
2201 |
result = self.lm.QueryLocks(["name"]) |
|
2202 |
self.assertEqual(objects.QueryResponse.FromDict(result).data, [ |
|
2203 |
[(constants.RS_NORMAL, "aaa")], |
|
2204 |
[(constants.RS_NORMAL, "bbb")], |
|
2205 |
]) |
|
2206 |
for i in [fl1, fl2]: |
|
2207 |
self.assertEqual(i.CountPending(), 0) |
|
2208 |
|
|
2209 |
for fn2 in [lambda x: x, reversed, sorted]: |
|
2210 |
fl1.AddResult(set([query.LQ_MODE]), list(fn([ |
|
2211 |
# Same name, but different information |
|
2212 |
("aaa", "mode0", None, None), |
|
2213 |
("aaa", "mode1", None, None), |
|
2214 |
("aaa", "mode2", None, None), |
|
2215 |
("aaa", "mode3", None, None), |
|
2216 |
]))) |
|
2217 |
fl2.AddResult(set([query.LQ_MODE]), [ |
|
2218 |
("zzz", "end", None, None), |
|
2219 |
("000", "start", None, None), |
|
2220 |
] + list(fn2([ |
|
2221 |
("aaa", "b200", None, None), |
|
2222 |
("aaa", "b300", None, None), |
|
2223 |
]))) |
|
2224 |
result = self.lm.QueryLocks(["name", "mode"]) |
|
2225 |
self.assertEqual(objects.QueryResponse.FromDict(result).data, [ |
|
2226 |
[(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")], |
|
2227 |
] + list(fn([ |
|
2228 |
# Name is the same, so order must be equal to incoming order |
|
2229 |
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")], |
|
2230 |
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")], |
|
2231 |
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")], |
|
2232 |
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")], |
|
2233 |
])) + list(fn2([ |
|
2234 |
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")], |
|
2235 |
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")], |
|
2236 |
])) + [ |
|
2237 |
[(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")], |
|
2238 |
]) |
|
2239 |
for i in [fl1, fl2]: |
|
2240 |
self.assertEqual(i.CountPending(), 0) |
|
2241 |
|
|
2157 | 2242 |
|
2158 | 2243 |
if __name__ == '__main__': |
2159 | 2244 |
testutils.GanetiTestProgram() |
Also available in: Unified diff