locking: Handle spurious notifications on lock acquire
authorMichael Hanselmann <hansmi@google.com>
Wed, 21 Mar 2012 13:55:22 +0000 (14:55 +0100)
committerMichael Hanselmann <hansmi@google.com>
Thu, 22 Mar 2012 13:03:27 +0000 (14:03 +0100)
This was already a TODO since the implementation of lock priorities in
September 2010. Under certain conditions a waiting acquire can be
notified at a time when it can't actually get the lock. In this case it
would try and fail to acquire the lock and then return to the caller
before the timeout ends.

While this is not bad (nothing breaks), it isn't nice either. A separate
patch will prevent unnecessary notifications when shared locks are
released.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Bernardo Dal Seno <bdalseno@google.com>

lib/locking.py
test/ganeti.locking_unittest.py

index a0e418e..5d5a01d 100644 (file)
@@ -33,6 +33,7 @@ import weakref
 import logging
 import heapq
 import itertools
+import time
 
 from ganeti import errors
 from ganeti import utils
@@ -404,12 +405,13 @@ class SharedLock(object):
     "__pending_by_prio",
     "__pending_shared",
     "__shr",
+    "__time_fn",
     "name",
     ]
 
   __condition_class = _PipeConditionWithMode
 
-  def __init__(self, name, monitor=None):
+  def __init__(self, name, monitor=None, _time_fn=time.time):
     """Construct a new SharedLock.
 
     @param name: the name of the lock
@@ -421,6 +423,9 @@ class SharedLock(object):
 
     self.name = name
 
+    # Used for unittesting
+    self.__time_fn = _time_fn
+
     # Internal lock
     self.__lock = threading.Lock()
 
@@ -682,24 +687,27 @@ class SharedLock(object):
         assert priority not in self.__pending_shared
         self.__pending_shared[priority] = wait_condition
 
+    wait_start = self.__time_fn()
+    acquired = False
+
     try:
       # Wait until we become the topmost acquire in the queue or the timeout
       # expires.
-      # TODO: Decrease timeout with spurious notifications
-      while not (self.__is_on_top(wait_condition) and
-                 self.__can_acquire(shared)):
-        # Wait for notification
-        wait_condition.wait(timeout)
-        self.__check_deleted()
+      while True:
+        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
+          self.__do_acquire(shared)
+          acquired = True
+          break
 
-        # A lot of code assumes blocking acquires always succeed. Loop
-        # internally for that case.
-        if timeout is not None:
+        # A lot of code assumes blocking acquires always succeed, therefore we
+        # can never return False for a blocking acquire
+        if (timeout is not None and
+            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
           break
 
-      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
-        self.__do_acquire(shared)
-        return True
+        # Wait for notification
+        wait_condition.wait(timeout)
+        self.__check_deleted()
     finally:
       # Remove condition from queue if there are no more waiters
       if not wait_condition.has_waiting():
@@ -709,7 +717,7 @@ class SharedLock(object):
           # (e.g. on lock deletion)
           self.__pending_shared.pop(priority, None)
 
-    return False
+    return acquired
 
   def acquire(self, shared=0, timeout=None, priority=None,
               test_notify=None):
@@ -800,15 +808,30 @@ class SharedLock(object):
         self.__shr.remove(threading.currentThread())
 
       # Notify topmost condition in queue
-      (priority, prioqueue) = self.__find_first_pending_queue()
-      if prioqueue:
-        cond = prioqueue[0]
-        cond.notifyAll()
-        if cond.shared:
-          # Prevent further shared acquires from sneaking in while waiters are
-          # notified
-          self.__pending_shared.pop(priority, None)
+      self.__notify_topmost()
+    finally:
+      self.__lock.release()
+
+  def __notify_topmost(self):
+    """Notifies topmost condition in queue of pending acquires.
+
+    """
+    (priority, prioqueue) = self.__find_first_pending_queue()
+    if prioqueue:
+      cond = prioqueue[0]
+      cond.notifyAll()
+      if cond.shared:
+        # Prevent further shared acquires from sneaking in while waiters are
+        # notified
+        self.__pending_shared.pop(priority, None)
+
+  def _notify_topmost(self):
+    """Exported version of L{__notify_topmost}.
 
+    """
+    self.__lock.acquire()
+    try:
+      return self.__notify_topmost()
     finally:
       self.__lock.release()
 
index c8a6807..e48e74f 100755 (executable)
@@ -953,6 +953,84 @@ class TestSharedLock(_ThreadedTestCase):
                                        for i in sorted(perprio.keys())]
                       for (shared, _, threads) in acquires])
 
+  class _FakeTimeForSpuriousNotifications:
+    def __init__(self, now, check_end):
+      self.now = now
+      self.check_end = check_end
+
+      # Deterministic random number generator
+      self.rnd = random.Random(15086)
+
+    def time(self):
+      # Advance time if the random number generator thinks so (this is to test
+      # multiple notifications without advancing the time)
+      if self.rnd.random() < 0.3:
+        self.now += self.rnd.random()
+
+      self.check_end(self.now)
+
+      return self.now
+
+  @_Repeat
+  def testAcquireTimeoutWithSpuriousNotifications(self):
+    ready = threading.Event()
+    locked = threading.Event()
+    req = Queue.Queue(0)
+
+    epoch = 4000.0
+    timeout = 60.0
+
+    def check_end(now):
+      self.assertFalse(locked.isSet())
+
+      # If we waited long enough (in virtual time), tell main thread to release
+      # lock, otherwise tell it to notify once more
+      req.put(now < (epoch + (timeout * 0.8)))
+
+    time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
+
+    sl = locking.SharedLock("test", _time_fn=time_fn)
+
+    # Acquire in exclusive mode
+    sl.acquire(shared=0)
+
+    def fn():
+      self.assertTrue(sl.acquire(shared=0, timeout=timeout,
+                                 test_notify=ready.set))
+      locked.set()
+      sl.release()
+      self.done.put("success")
+
+    # Start acquire with timeout and wait for it to be ready
+    self._addThread(target=fn)
+    ready.wait()
+
+    # The separate thread is now waiting to acquire the lock, so start sending
+    # spurious notifications.
+
+    # Wait for separate thread to ask for another notification
+    count = 0
+    while req.get():
+      # After sending the notification, the lock will take a short amount of
+      # time to notice and to retrieve the current time
+      sl._notify_topmost()
+      count += 1
+
+    self.assertTrue(count > 100, "Not enough notifications were sent")
+
+    self.assertFalse(locked.isSet())
+
+    # Some notifications have been sent, now actually release the lock
+    sl.release()
+
+    # Wait for lock to be acquired
+    locked.wait()
+
+    self._waitThreads()
+
+    self.assertEqual(self.done.get_nowait(), "success")
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
 
 class TestSharedLockInCondition(_ThreadedTestCase):
   """SharedLock as a condition lock tests"""