# Internal lock acquisition modes for L{LockSet}
(_LS_ACQUIRE_EXACT,
- _LS_ACQUIRE_ALL) = range(1, 3)
+ _LS_ACQUIRE_ALL,
+ _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
+
+_LS_ACQUIRE_MODES = frozenset([
+ _LS_ACQUIRE_EXACT,
+ _LS_ACQUIRE_ALL,
+ _LS_ACQUIRE_OPPORTUNISTIC,
+ ])
def ssynchronized(mylock, shared=0):
ALL_SET = None
+def _TimeoutZero():
+ """Returns the number zero.
+
+ """
+ return 0
+
+
+def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
+ """Determines modes and timeouts for L{LockSet.acquire}.
+
+ @type want_all: boolean
+ @param want_all: Whether all locks in set should be acquired
+ @param timeout: Timeout in seconds or C{None}
+ @param opportunistic: Whther locks should be acquired opportunistically
+ @rtype: tuple
+ @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
+ (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
+ acquiring the lockset-internal lock (might be C{None}) and a function to
+ calculate the timeout for acquiring individual locks
+
+ """
+ # Short circuit when no running timeout is needed
+ if opportunistic and not want_all:
+ assert timeout is None, "Got timeout for an opportunistic acquisition"
+ return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
+
+ # We need to keep track of how long we spent waiting for a lock. The
+ # timeout passed to this function is over all lock acquisitions.
+ running_timeout = utils.RunningTimeout(timeout, False)
+
+ if want_all:
+ mode = _LS_ACQUIRE_ALL
+ ls_timeout_fn = running_timeout.Remaining
+ else:
+ mode = _LS_ACQUIRE_EXACT
+ ls_timeout_fn = None
+
+ if opportunistic:
+ mode = _LS_ACQUIRE_OPPORTUNISTIC
+ timeout_fn = _TimeoutZero
+ else:
+ timeout_fn = running_timeout.Remaining
+
+ return (mode, ls_timeout_fn, timeout_fn)
+
+
class _AcquireTimeout(Exception):
"""Internal exception to abort an acquire on a timeout.
return set(result)
def acquire(self, names, timeout=None, shared=0, priority=None,
- test_notify=None):
+ opportunistic=False, test_notify=None):
"""Acquire a set of resource locks.
+ @note: When acquiring locks opportunistically, any number of locks might
+ actually be acquired, even zero.
+
@type names: list of strings (or string)
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@type timeout: float or None
- @param timeout: Maximum time to acquire all locks
+ @param timeout: Maximum time to acquire all locks; for opportunistic
+ acquisitions, a timeout can only be given when C{names} is C{None}, in
+ which case it is exclusively used for acquiring the L{LockSet}-internal
+ lock; opportunistic acquisitions don't use a timeout for acquiring
+ individual locks
@type priority: integer
@param priority: Priority for acquiring locks
+ @type opportunistic: boolean
+ @param opportunistic: Acquire locks opportunistically; use the return value
+ to determine which locks were actually acquired
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
if priority is None:
priority = _DEFAULT_PRIORITY
- # We need to keep track of how long we spent waiting for a lock. The
- # timeout passed to this function is over all lock acquires.
- running_timeout = utils.RunningTimeout(timeout, False)
-
try:
if names is not None:
+ assert timeout is None or not opportunistic, \
+ ("Opportunistic acquisitions can only use a timeout if no"
+ " names are given; see docstring for details")
+
# Support passing in a single resource to acquire rather than many
if isinstance(names, basestring):
names = [names]
- return self.__acquire_inner(names, _LS_ACQUIRE_EXACT, shared, priority,
- running_timeout.Remaining, test_notify)
+ (mode, _, timeout_fn) = \
+ _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
+
+ return self.__acquire_inner(names, mode, shared, priority,
+ timeout_fn, test_notify)
else:
+ (mode, ls_timeout_fn, timeout_fn) = \
+ _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
+
# If no names are given acquire the whole set by not letting new names
# being added before we release, and getting the current list of names.
# Some of them may then be deleted later, but we'll cope with this.
# anyway, though, so we'll get the list lock exclusively as well in
# order to be able to do add() on the set while owning it.
if not self.__lock.acquire(shared=shared, priority=priority,
- timeout=running_timeout.Remaining()):
+ timeout=ls_timeout_fn()):
raise _AcquireTimeout()
+
try:
# note we own the set-lock
self._add_owned()
- return self.__acquire_inner(self.__names(), _LS_ACQUIRE_ALL, shared,
- priority, running_timeout.Remaining,
- test_notify)
+ return self.__acquire_inner(self.__names(), mode, shared,
+ priority, timeout_fn, test_notify)
except:
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
timeout_fn, test_notify):
"""Inner logic for acquiring a number of locks.
+ Acquisition modes:
+
+ - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
+ deleted locks can be ignored as the whole set is being acquired with
+ its internal lock held
+ - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
+ timeouts and deleted locks are fatal
+ - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
+ all within the set) which should be acquired opportunistically, that is
+ failures are ignored
+
@param names: Names of the locks to be acquired
- @param mode: Lock acquisition mode
+ @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
@param shared: Whether to acquire in shared mode
- @param timeout_fn: Function returning remaining timeout
+ @param timeout_fn: Function returning remaining timeout (C{None} for
+ opportunistic acquisitions)
@param priority: Priority for acquiring locks
@param test_notify: Special callback function for unittesting
"""
- assert mode in (_LS_ACQUIRE_EXACT, _LS_ACQUIRE_ALL)
+ assert mode in _LS_ACQUIRE_MODES
acquire_list = []
priority=priority,
test_notify=test_notify_fn)
except errors.LockError:
- if mode == _LS_ACQUIRE_ALL:
+ if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
# We are acquiring the whole set, it doesn't matter if this
# particular element is not there anymore.
continue
if not acq_success:
# Couldn't get lock or timeout occurred
+ if mode == _LS_ACQUIRE_OPPORTUNISTIC:
+ # Ignore timeouts on opportunistic acquisitions
+ continue
+
if timeout is None:
# This shouldn't happen as SharedLock.acquire(timeout=None) is
# blocking.
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertRaises(Queue.Empty, done_two.get_nowait)
+ def testNamesWithOpportunisticAndTimeout(self):
+ self.assertRaises(AssertionError, self.ls.acquire,
+ ["one", "two"], timeout=1.0, opportunistic=True)
+
+ def testOpportunisticWithUnknownName(self):
+ name = "unknown"
+ self.assertFalse(name in self.ls._names())
+ result = self.ls.acquire(name, opportunistic=True)
+ self.assertFalse(result)
+ self.assertFalse(self.ls.list_owned())
+
+ result = self.ls.acquire(["two", name], opportunistic=True)
+ self.assertEqual(result, set(["two"]))
+ self.assertEqual(self.ls.list_owned(), set(["two"]))
+
+ self.ls.release()
+
+ def testSimpleOpportunisticAcquisition(self):
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Hold a lock in main thread
+ self.assertEqual(self.ls.acquire("two", shared=0), set(["two"]))
+
+ def fn():
+ # The lock "two" is held by the main thread
+ result = self.ls.acquire(["one", "two"], shared=0, opportunistic=True)
+ self.assertEqual(result, set(["one"]))
+ self.assertEqual(self.ls.list_owned(), set(["one"]))
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+
+ # Try to acquire the lock held by the main thread
+ result = self.ls.acquire(["two"], shared=0, opportunistic=True)
+ self.assertFalse(self.ls._get_lock().is_owned())
+ self.assertFalse(result)
+ self.assertFalse(self.ls.list_owned())
+
+ # Try to acquire all locks
+ result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True)
+ self.assertTrue(self.ls._get_lock().is_owned(),
+ msg="Internal lock is not owned")
+ self.assertEqual(result, set(["one", "three"]))
+ self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
+
+ self.ls.release()
+
+ self.assertFalse(self.ls.list_owned())
+
+ self.done.put(True)
+
+ self._addThread(target=fn)
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ self.assertEqual(self.ls.list_owned(), set(["two"]))
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertTrue(self.done.get_nowait())
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testOpportunisticAcquisitionWithoutNamesExpires(self):
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Hold all locks in main thread
+ self.ls.acquire(locking.ALL_SET, shared=0)
+ self.assertTrue(self.ls._get_lock().is_owned())
+
+ def fn():
+ # Try to acquire all locks in separate thread
+ result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True,
+ timeout=0.1)
+ self.assertFalse(result)
+ self.assertFalse(self.ls._get_lock().is_owned())
+ self.assertFalse(self.ls.list_owned())
+
+ # Try once more without a timeout
+ self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
+
+ self.done.put(True)
+
+ self._addThread(target=fn)
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned(shared=0))
+
+ self.assertTrue(self.done.get_nowait())
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testSharedOpportunisticAcquisitionWithoutNames(self):
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Hold all locks in main thread
+ self.ls.acquire(locking.ALL_SET, shared=1)
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+
+ def fn():
+ # Try to acquire all locks in separate thread in shared mode
+ result = self.ls.acquire(locking.ALL_SET, shared=1, opportunistic=True,
+ timeout=0.1)
+ self.assertEqual(result, set(["one", "two", "three"]))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+ self.ls.release()
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ # Try one in exclusive mode
+ self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
+
+ self.done.put(True)
+
+ self._addThread(target=fn)
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
+
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertTrue(self.done.get_nowait())
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testLockDeleteWithOpportunisticAcquisition(self):
+ # This test exercises some code handling LockError on acquisition, that is
+ # after all lock names have been gathered. This shouldn't happen in reality
+ # as removing locks from the set requires the lockset-internal lock, but
+ # the code should handle the situation anyway.
+ ready = threading.Event()
+ finished = threading.Event()
+
+ self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+ # Thread function to delete lock
+ def fn():
+ # Wait for notification
+ ready.wait()
+
+ # Delete lock named "two" by accessing lockset-internal data
+ ld = self.ls._get_lockdict()
+ self.assertTrue(ld["two"].delete())
+
+ self.done.put("deleted.two")
+
+ # Notify helper
+ finished.set()
+
+ self._addThread(target=fn)
+
+ # Notification helper, called when lock already holds internal lock.
+ # Therefore only one of the locks not yet locked can be deleted.
+ def notify(name):
+ self.done.put("notify.%s" % name)
+
+ if name == "one":
+ # Tell helper thread to delete lock "two"
+ ready.set()
+ finished.wait()
+
+ # Hold all locks in main thread
+ self.ls.acquire(locking.ALL_SET, shared=0, test_notify=notify)
+ self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ # Release all locks
+ self.ls.release()
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+
+ self.assertEqual(self.done.get_nowait(), "notify.one")
+ self.assertEqual(self.done.get_nowait(), "deleted.two")
+ self.assertEqual(self.done.get_nowait(), "notify.three")
+ self.assertEqual(self.done.get_nowait(), "notify.two")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+
+class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
+ def setUp(self):
+ self.fn = locking._GetLsAcquireModeAndTimeouts
+
+ def testOpportunisticWithoutNames(self):
+ (mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True)
+ self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+ self.assertTrue(ls_timeout_fn is None)
+ self.assertEqual(timeout_fn(), 0)
+
+ def testAllInputCombinations(self):
+ for want_all in [False, True]:
+ for timeout in [None, 0, 100]:
+ for opportunistic in [False, True]:
+ if (opportunistic and
+ not want_all and
+ timeout is not None):
+ # Can't accept a timeout when acquiring opportunistically
+ self.assertRaises(AssertionError, self.fn,
+ want_all, timeout, opportunistic)
+ else:
+ (mode, ls_timeout_fn, timeout_fn) = \
+ self.fn(want_all, timeout, opportunistic)
+
+ if opportunistic:
+ self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+ self.assertEqual(timeout_fn(), 0)
+ else:
+ self.assertTrue(callable(timeout_fn))
+ if want_all:
+ self.assertEqual(mode, locking._LS_ACQUIRE_ALL)
+ else:
+ self.assertEqual(mode, locking._LS_ACQUIRE_EXACT)
+
+ if want_all:
+ self.assertTrue(callable(ls_timeout_fn))
+ else:
+ self.assertTrue(ls_timeout_fn is None)
+
class TestGanetiLockManager(_ThreadedTestCase):
def setUp(self):