locking: Implement opportunistic locking in LockSet
authorMichael Hanselmann <hansmi@google.com>
Mon, 19 Nov 2012 20:59:47 +0000 (21:59 +0100)
committerMichael Hanselmann <hansmi@google.com>
Mon, 3 Dec 2012 13:56:02 +0000 (14:56 +0100)
This patch adds a new parameter to “LockSet.acquire” named
“opportunistic”. When enabled the lockset will try to acquire as many
locks as possible, but it won't wait for them (with the exception of the
lockset-internal lock in case the whole set is acquired). This is
implemented by using a timeout of 0 seconds when acquiring individual
locks. Commit 03c5291 made such acquisitions significantly cheaper.

The most complicated code included in this patch is probably the helper
function used to determine which mode to use and which timeout functions
are needed.

Full unit tests are included for the new and changed code
(“locking.py”'s overall coverage is at 97%).

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

lib/locking.py
test/ganeti.locking_unittest.py

index ec7180a..b0b4758 100644 (file)
@@ -53,7 +53,14 @@ _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
 
 # Internal lock acquisition modes for L{LockSet}
 (_LS_ACQUIRE_EXACT,
- _LS_ACQUIRE_ALL) = range(1, 3)
+ _LS_ACQUIRE_ALL,
+ _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
+
+_LS_ACQUIRE_MODES = frozenset([
+  _LS_ACQUIRE_EXACT,
+  _LS_ACQUIRE_ALL,
+  _LS_ACQUIRE_OPPORTUNISTIC,
+  ])
 
 
 def ssynchronized(mylock, shared=0):
@@ -917,6 +924,52 @@ class SharedLock(object):
 ALL_SET = None
 
 
+def _TimeoutZero():
+  """Returns the number zero.
+
+  """
+  return 0
+
+
+def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
+  """Determines modes and timeouts for L{LockSet.acquire}.
+
+  @type want_all: boolean
+  @param want_all: Whether all locks in set should be acquired
+  @param timeout: Timeout in seconds or C{None}
+  @param opportunistic: Whther locks should be acquired opportunistically
+  @rtype: tuple
+  @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
+    (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
+    acquiring the lockset-internal lock (might be C{None}) and a function to
+    calculate the timeout for acquiring individual locks
+
+  """
+  # Short circuit when no running timeout is needed
+  if opportunistic and not want_all:
+    assert timeout is None, "Got timeout for an opportunistic acquisition"
+    return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
+
+  # We need to keep track of how long we spent waiting for a lock. The
+  # timeout passed to this function is over all lock acquisitions.
+  running_timeout = utils.RunningTimeout(timeout, False)
+
+  if want_all:
+    mode = _LS_ACQUIRE_ALL
+    ls_timeout_fn = running_timeout.Remaining
+  else:
+    mode = _LS_ACQUIRE_EXACT
+    ls_timeout_fn = None
+
+  if opportunistic:
+    mode = _LS_ACQUIRE_OPPORTUNISTIC
+    timeout_fn = _TimeoutZero
+  else:
+    timeout_fn = running_timeout.Remaining
+
+  return (mode, ls_timeout_fn, timeout_fn)
+
+
 class _AcquireTimeout(Exception):
   """Internal exception to abort an acquire on a timeout.
 
@@ -1114,9 +1167,12 @@ class LockSet:
     return set(result)
 
   def acquire(self, names, timeout=None, shared=0, priority=None,
-              test_notify=None):
+              opportunistic=False, test_notify=None):
     """Acquire a set of resource locks.
 
+    @note: When acquiring locks opportunistically, any number of locks might
+      actually be acquired, even zero.
+
     @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
@@ -1124,9 +1180,16 @@ class LockSet:
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float or None
-    @param timeout: Maximum time to acquire all locks
+    @param timeout: Maximum time to acquire all locks; for opportunistic
+      acquisitions, a timeout can only be given when C{names} is C{None}, in
+      which case it is exclusively used for acquiring the L{LockSet}-internal
+      lock; opportunistic acquisitions don't use a timeout for acquiring
+      individual locks
     @type priority: integer
     @param priority: Priority for acquiring locks
+    @type opportunistic: boolean
+    @param opportunistic: Acquire locks opportunistically; use the return value
+      to determine which locks were actually acquired
     @type test_notify: callable or None
     @param test_notify: Special callback function for unittesting
 
@@ -1146,20 +1209,26 @@ class LockSet:
     if priority is None:
       priority = _DEFAULT_PRIORITY
 
-    # We need to keep track of how long we spent waiting for a lock. The
-    # timeout passed to this function is over all lock acquires.
-    running_timeout = utils.RunningTimeout(timeout, False)
-
     try:
       if names is not None:
+        assert timeout is None or not opportunistic, \
+          ("Opportunistic acquisitions can only use a timeout if no"
+           " names are given; see docstring for details")
+
         # Support passing in a single resource to acquire rather than many
         if isinstance(names, basestring):
           names = [names]
 
-        return self.__acquire_inner(names, _LS_ACQUIRE_EXACT, shared, priority,
-                                    running_timeout.Remaining, test_notify)
+        (mode, _, timeout_fn) = \
+          _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
+
+        return self.__acquire_inner(names, mode, shared, priority,
+                                    timeout_fn, test_notify)
 
       else:
+        (mode, ls_timeout_fn, timeout_fn) = \
+          _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
+
         # If no names are given acquire the whole set by not letting new names
         # being added before we release, and getting the current list of names.
         # Some of them may then be deleted later, but we'll cope with this.
@@ -1170,15 +1239,15 @@ class LockSet:
         # anyway, though, so we'll get the list lock exclusively as well in
         # order to be able to do add() on the set while owning it.
         if not self.__lock.acquire(shared=shared, priority=priority,
-                                   timeout=running_timeout.Remaining()):
+                                   timeout=ls_timeout_fn()):
           raise _AcquireTimeout()
+
         try:
           # note we own the set-lock
           self._add_owned()
 
-          return self.__acquire_inner(self.__names(), _LS_ACQUIRE_ALL, shared,
-                                      priority, running_timeout.Remaining,
-                                      test_notify)
+          return self.__acquire_inner(self.__names(), mode, shared,
+                                      priority, timeout_fn, test_notify)
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
@@ -1194,15 +1263,27 @@ class LockSet:
                       timeout_fn, test_notify):
     """Inner logic for acquiring a number of locks.
 
+    Acquisition modes:
+
+      - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
+        deleted locks can be ignored as the whole set is being acquired with
+        its internal lock held
+      - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
+        timeouts and deleted locks are fatal
+      - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
+        all within the set) which should be acquired opportunistically, that is
+        failures are ignored
+
     @param names: Names of the locks to be acquired
-    @param mode: Lock acquisition mode
+    @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
     @param shared: Whether to acquire in shared mode
-    @param timeout_fn: Function returning remaining timeout
+    @param timeout_fn: Function returning remaining timeout (C{None} for
+      opportunistic acquisitions)
     @param priority: Priority for acquiring locks
     @param test_notify: Special callback function for unittesting
 
     """
-    assert mode in (_LS_ACQUIRE_EXACT, _LS_ACQUIRE_ALL)
+    assert mode in _LS_ACQUIRE_MODES
 
     acquire_list = []
 
@@ -1246,7 +1327,7 @@ class LockSet:
                                      priority=priority,
                                      test_notify=test_notify_fn)
         except errors.LockError:
-          if mode == _LS_ACQUIRE_ALL:
+          if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
             # We are acquiring the whole set, it doesn't matter if this
             # particular element is not there anymore.
             continue
@@ -1256,6 +1337,10 @@ class LockSet:
 
         if not acq_success:
           # Couldn't get lock or timeout occurred
+          if mode == _LS_ACQUIRE_OPPORTUNISTIC:
+            # Ignore timeouts on opportunistic acquisitions
+            continue
+
           if timeout is None:
             # This shouldn't happen as SharedLock.acquire(timeout=None) is
             # blocking.
index 6863d6d..705103f 100755 (executable)
@@ -1780,6 +1780,235 @@ class TestLockSet(_ThreadedTestCase):
     self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.assertRaises(Queue.Empty, done_two.get_nowait)
 
+  def testNamesWithOpportunisticAndTimeout(self):
+    self.assertRaises(AssertionError, self.ls.acquire,
+                      ["one", "two"], timeout=1.0, opportunistic=True)
+
+  def testOpportunisticWithUnknownName(self):
+    name = "unknown"
+    self.assertFalse(name in self.ls._names())
+    result = self.ls.acquire(name, opportunistic=True)
+    self.assertFalse(result)
+    self.assertFalse(self.ls.list_owned())
+
+    result = self.ls.acquire(["two", name], opportunistic=True)
+    self.assertEqual(result, set(["two"]))
+    self.assertEqual(self.ls.list_owned(), set(["two"]))
+
+    self.ls.release()
+
+  def testSimpleOpportunisticAcquisition(self):
+    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+    # Hold a lock in main thread
+    self.assertEqual(self.ls.acquire("two", shared=0), set(["two"]))
+
+    def fn():
+      # The lock "two" is held by the main thread
+      result = self.ls.acquire(["one", "two"], shared=0, opportunistic=True)
+      self.assertEqual(result, set(["one"]))
+      self.assertEqual(self.ls.list_owned(), set(["one"]))
+      self.assertFalse(self.ls._get_lock().is_owned())
+
+      self.ls.release()
+      self.assertFalse(self.ls.list_owned())
+
+      # Try to acquire the lock held by the main thread
+      result = self.ls.acquire(["two"], shared=0, opportunistic=True)
+      self.assertFalse(self.ls._get_lock().is_owned())
+      self.assertFalse(result)
+      self.assertFalse(self.ls.list_owned())
+
+      # Try to acquire all locks
+      result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True)
+      self.assertTrue(self.ls._get_lock().is_owned(),
+                      msg="Internal lock is not owned")
+      self.assertEqual(result, set(["one", "three"]))
+      self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
+
+      self.ls.release()
+
+      self.assertFalse(self.ls.list_owned())
+
+      self.done.put(True)
+
+    self._addThread(target=fn)
+
+    # Wait for threads to finish
+    self._waitThreads()
+
+    self.assertEqual(self.ls.list_owned(), set(["two"]))
+
+    self.ls.release()
+    self.assertFalse(self.ls.list_owned())
+    self.assertFalse(self.ls._get_lock().is_owned())
+
+    self.assertTrue(self.done.get_nowait())
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  def testOpportunisticAcquisitionWithoutNamesExpires(self):
+    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+    # Hold all locks in main thread
+    self.ls.acquire(locking.ALL_SET, shared=0)
+    self.assertTrue(self.ls._get_lock().is_owned())
+
+    def fn():
+      # Try to acquire all locks in separate thread
+      result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True,
+                               timeout=0.1)
+      self.assertFalse(result)
+      self.assertFalse(self.ls._get_lock().is_owned())
+      self.assertFalse(self.ls.list_owned())
+
+      # Try once more without a timeout
+      self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
+
+      self.done.put(True)
+
+    self._addThread(target=fn)
+
+    # Wait for threads to finish
+    self._waitThreads()
+
+    self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
+
+    self.ls.release()
+    self.assertFalse(self.ls.list_owned())
+    self.assertFalse(self.ls._get_lock().is_owned(shared=0))
+
+    self.assertTrue(self.done.get_nowait())
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  def testSharedOpportunisticAcquisitionWithoutNames(self):
+    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+    # Hold all locks in main thread
+    self.ls.acquire(locking.ALL_SET, shared=1)
+    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+
+    def fn():
+      # Try to acquire all locks in separate thread in shared mode
+      result = self.ls.acquire(locking.ALL_SET, shared=1, opportunistic=True,
+                               timeout=0.1)
+      self.assertEqual(result, set(["one", "two", "three"]))
+      self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+      self.ls.release()
+      self.assertFalse(self.ls._get_lock().is_owned())
+
+      # Try one in exclusive mode
+      self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
+
+      self.done.put(True)
+
+    self._addThread(target=fn)
+
+    # Wait for threads to finish
+    self._waitThreads()
+
+    self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
+
+    self.ls.release()
+    self.assertFalse(self.ls.list_owned())
+    self.assertFalse(self.ls._get_lock().is_owned())
+
+    self.assertTrue(self.done.get_nowait())
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  def testLockDeleteWithOpportunisticAcquisition(self):
+    # This test exercises some code handling LockError on acquisition, that is
+    # after all lock names have been gathered. This shouldn't happen in reality
+    # as removing locks from the set requires the lockset-internal lock, but
+    # the code should handle the situation anyway.
+    ready = threading.Event()
+    finished = threading.Event()
+
+    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
+
+    # Thread function to delete lock
+    def fn():
+      # Wait for notification
+      ready.wait()
+
+      # Delete lock named "two" by accessing lockset-internal data
+      ld = self.ls._get_lockdict()
+      self.assertTrue(ld["two"].delete())
+
+      self.done.put("deleted.two")
+
+      # Notify helper
+      finished.set()
+
+    self._addThread(target=fn)
+
+    # Notification helper, called when lock already holds internal lock.
+    # Therefore only one of the locks not yet locked can be deleted.
+    def notify(name):
+      self.done.put("notify.%s" % name)
+
+      if name == "one":
+        # Tell helper thread to delete lock "two"
+        ready.set()
+        finished.wait()
+
+    # Hold all locks in main thread
+    self.ls.acquire(locking.ALL_SET, shared=0, test_notify=notify)
+    self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
+
+    # Wait for threads to finish
+    self._waitThreads()
+
+    # Release all locks
+    self.ls.release()
+    self.assertFalse(self.ls.list_owned())
+    self.assertFalse(self.ls._get_lock().is_owned())
+
+    self.assertEqual(self.done.get_nowait(), "notify.one")
+    self.assertEqual(self.done.get_nowait(), "deleted.two")
+    self.assertEqual(self.done.get_nowait(), "notify.three")
+    self.assertEqual(self.done.get_nowait(), "notify.two")
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+
+class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
+  def setUp(self):
+    self.fn = locking._GetLsAcquireModeAndTimeouts
+
+  def testOpportunisticWithoutNames(self):
+    (mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True)
+    self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+    self.assertTrue(ls_timeout_fn is None)
+    self.assertEqual(timeout_fn(), 0)
+
+  def testAllInputCombinations(self):
+    for want_all in [False, True]:
+      for timeout in [None, 0, 100]:
+        for opportunistic in [False, True]:
+          if (opportunistic and
+              not want_all and
+              timeout is not None):
+            # Can't accept a timeout when acquiring opportunistically
+            self.assertRaises(AssertionError, self.fn,
+                              want_all, timeout, opportunistic)
+          else:
+            (mode, ls_timeout_fn, timeout_fn) = \
+              self.fn(want_all, timeout, opportunistic)
+
+            if opportunistic:
+              self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
+              self.assertEqual(timeout_fn(), 0)
+            else:
+              self.assertTrue(callable(timeout_fn))
+              if want_all:
+                self.assertEqual(mode, locking._LS_ACQUIRE_ALL)
+              else:
+                self.assertEqual(mode, locking._LS_ACQUIRE_EXACT)
+
+            if want_all:
+              self.assertTrue(callable(ls_timeout_fn))
+            else:
+              self.assertTrue(ls_timeout_fn is None)
+
 
 class TestGanetiLockManager(_ThreadedTestCase):
   def setUp(self):