locking.LockSet: Implement acquire timeouts
authorMichael Hanselmann <hansmi@google.com>
Fri, 9 Oct 2009 11:39:17 +0000 (13:39 +0200)
committerMichael Hanselmann <hansmi@google.com>
Mon, 12 Oct 2009 10:46:58 +0000 (12:46 +0200)
The timeout passed to LockSet.acquire() is measured over all lock acquires. If
LockSet.acquire fails to acquire all requested locks within the specified
amount of time, all locks are released again and the acquire fails.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>

lib/cmdlib.py
lib/locking.py
test/ganeti.locking_unittest.py

index a5e2531..0dbdba1 100644 (file)
@@ -2396,7 +2396,6 @@ class LUQueryNodes(NoHooksLU):
       # if we don't request only static fields, we need to lock the nodes
       self.needed_locks[locking.LEVEL_NODE] = self.wanted
 
-
   def CheckPrereq(self):
     """Check prerequisites.
 
index 862e66d..7980e50 100644 (file)
@@ -632,6 +632,12 @@ class SharedLock(object):
 ALL_SET = None
 
 
+class _AcquireTimeout(Exception):
+  """Internal exception to abort an acquire on a timeout.
+
+  """
+
+
 class LockSet:
   """Implements a set of locks.
 
@@ -702,6 +708,12 @@ class LockSet:
     else:
       return set()
 
+  def _release_and_delete_owned(self):
+    """Release and delete all resources owned by the current thread"""
+    for lname in self._list_owned():
+      self.__lockdict[lname].release()
+      self._del_owned(name=lname)
+
   def __names(self):
     """Return the current set of names.
 
@@ -730,30 +742,43 @@ class LockSet:
         self.__lock.release()
     return set(result)
 
-  def acquire(self, names, timeout=None, shared=0):
+  def acquire(self, names, timeout=None, shared=0, test_notify=None):
     """Acquire a set of resource locks.
 
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
-    @type timeout: float
+    @type timeout: float or None
     @param timeout: Maximum time to acquire all locks
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
-    @return: True when all the locks are successfully acquired
+    @return: Set of all locks successfully acquired or None in case of timeout
 
     @raise errors.LockError: when any lock we try to acquire has
         been deleted before we succeed. In this case none of the
         locks requested will be acquired.
 
     """
-    if timeout is not None:
-      raise NotImplementedError
+    assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
 
-    if names is None:
+    # We need to keep track of how long we spent waiting for a lock. The
+    # timeout passed to this function is over all lock acquires.
+    remaining_timeout = timeout
+    if timeout is None:
+      start = None
+      calc_remaining_timeout = lambda: None
+    else:
+      start = time.time()
+      calc_remaining_timeout = lambda: (start + timeout) - time.time()
+
+    want_all = names is None
+
+    if want_all:
       # If no names are given acquire the whole set by not letting new names
       # being added before we release, and getting the current list of names.
       # Some of them may then be deleted later, but we'll cope with this.
@@ -763,7 +788,7 @@ class LockSet:
       # them exclusively though they won't be able to do this anyway, though,
       # so we'll get the list lock exclusively as well in order to be able to
       # do add() on the set while owning it.
-      self.__lock.acquire(shared=shared)
+      self.__lock.acquire(shared=shared, timeout=remaining_timeout)
       try:
         # note we own the set-lock
         self._add_owned()
@@ -775,65 +800,103 @@ class LockSet:
         self.__lock.release()
         raise
 
+      # Re-calculate timeout
+      remaining_timeout = calc_remaining_timeout()
+
     try:
-      # Support passing in a single resource to acquire rather than many
-      if isinstance(names, basestring):
-        names = [names]
-      else:
-        names = sorted(names)
-
-      acquire_list = []
-      # First we look the locks up on __lockdict. We have no way of being sure
-      # they will still be there after, but this makes it a lot faster should
-      # just one of them be the already wrong
-      for lname in utils.UniqueSequence(names):
-        try:
-          lock = self.__lockdict[lname] # raises KeyError if lock is not there
-          acquire_list.append((lname, lock))
-        except (KeyError):
-          if self.__lock._is_owned():
-            # We are acquiring all the set, it doesn't matter if this
-            # particular element is not there anymore.
-            continue
-          else:
-            raise errors.LockError('non-existing lock in set (%s)' % lname)
-
-      # This will hold the locknames we effectively acquired.
-      acquired = set()
-      # Now acquire_list contains a sorted list of resources and locks we want.
-      # In order to get them we loop on this (private) list and acquire() them.
-      # We gave no real guarantee they will still exist till this is done but
-      # .acquire() itself is safe and will alert us if the lock gets deleted.
-      for (lname, lock) in acquire_list:
-        try:
-          lock.acquire(shared=shared) # raises LockError if the lock is deleted
-          # now the lock cannot be deleted, we have it!
-          self._add_owned(name=lname)
-          acquired.add(lname)
-        except (errors.LockError):
-          if self.__lock._is_owned():
-            # We are acquiring all the set, it doesn't matter if this
-            # particular element is not there anymore.
-            continue
+      try:
+        # Support passing in a single resource to acquire rather than many
+        if isinstance(names, basestring):
+          names = [names]
+        else:
+          names = sorted(names)
+
+        acquire_list = []
+        # First we look the locks up on __lockdict. We have no way of being sure
+        # they will still be there after, but this makes it a lot faster should
+        # just one of them be the already wrong
+        for lname in utils.UniqueSequence(names):
+          try:
+            lock = self.__lockdict[lname] # raises KeyError if lock is not there
+            acquire_list.append((lname, lock))
+          except KeyError:
+            if want_all:
+              # We are acquiring all the set, it doesn't matter if this
+              # particular element is not there anymore.
+              continue
+            else:
+              raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+        # This will hold the locknames we effectively acquired.
+        acquired = set()
+
+        # Now acquire_list contains a sorted list of resources and locks we
+        # want.  In order to get them we loop on this (private) list and
+        # acquire() them.  We gave no real guarantee they will still exist till
+        # this is done but .acquire() itself is safe and will alert us if the
+        # lock gets deleted.
+        for (lname, lock) in acquire_list:
+          if __debug__ and callable(test_notify):
+            test_notify_fn = lambda: test_notify(lname)
           else:
-            name_fail = lname
-            for lname in self._list_owned():
-              self.__lockdict[lname].release()
-              self._del_owned(name=lname)
-            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
-        except:
-          # We shouldn't have problems adding the lock to the owners list, but
-          # if we did we'll try to release this lock and re-raise exception.
-          # Of course something is going to be really wrong, after this.
-          if lock._is_owned():
-            lock.release()
-          raise
+            test_notify_fn = None
 
-    except:
-      # If something went wrong and we had the set-lock let's release it...
-      if self.__lock._is_owned():
-        self.__lock.release()
-      raise
+          try:
+            if timeout is not None and remaining_timeout < 0:
+              raise _AcquireTimeout()
+
+            # raises LockError if the lock was deleted
+            if not lock.acquire(shared=shared, timeout=remaining_timeout,
+                                test_notify=test_notify_fn):
+              # Couldn't get lock or timeout occurred
+              if timeout is None:
+                # This shouldn't happen as SharedLock.acquire(timeout=None) is
+                # blocking.
+                raise errors.LockError("Failed to get lock %s" % lname)
+
+              raise _AcquireTimeout()
+
+            # Re-calculate timeout
+            remaining_timeout = calc_remaining_timeout()
+
+            # now the lock cannot be deleted, we have it!
+            self._add_owned(name=lname)
+            acquired.add(lname)
+
+          except _AcquireTimeout:
+            # Release all acquired locks
+            self._release_and_delete_owned()
+            raise
+
+          except errors.LockError:
+            if want_all:
+              # We are acquiring all the set, it doesn't matter if this
+              # particular element is not there anymore.
+              continue
+
+            self._release_and_delete_owned()
+
+            raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+          except:
+            # We shouldn't have problems adding the lock to the owners list, but
+            # if we did we'll try to release this lock and re-raise exception.
+            # Of course something is going to be really wrong, after this.
+            if lock._is_owned():
+              lock.release()
+            raise
+
+      except:
+        # If something went wrong and we had the set-lock let's release it...
+        if want_all:
+          self.__lock.release()
+        raise
+
+    except _AcquireTimeout:
+      if want_all:
+        self._del_owned()
+
+      return None
 
     return acquired
 
@@ -939,7 +1002,7 @@ class LockSet:
 
     @param names: names of the resource to remove.
 
-    @return:: a list of locks which we removed; the list is always
+    @return: a list of locks which we removed; the list is always
         equal to the names list if we were holding all the locks
         exclusively
 
index 9483625..f3630b7 100755 (executable)
@@ -48,6 +48,15 @@ def _Repeat(fn):
   return wrapper
 
 
+def SafeSleep(duration):
+  start = time.time()
+  while True:
+    delay = start + duration - time.time()
+    if delay <= 0.0:
+      break
+    time.sleep(delay)
+
+
 class _ThreadedTestCase(unittest.TestCase):
   """Test class that supports adding/waiting on threads"""
   def setUp(self):
@@ -966,6 +975,125 @@ class TestLockSet(_ThreadedTestCase):
       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
 
   @_Repeat
+  def testSimpleAcquireTimeoutExpiring(self):
+    names = sorted(self.ls._names())
+    self.assert_(len(names) >= 3)
+
+    # Get name of first lock
+    first = names[0]
+
+    # Get name of last lock
+    last = names.pop()
+
+    checks = [
+      # Block first and try to lock it again
+      (first, first),
+
+      # Block last and try to lock all locks
+      (None, first),
+
+      # Block last and try to lock it again
+      (last, last),
+      ]
+
+    for (wanted, block) in checks:
+      # Lock in exclusive mode
+      self.assert_(self.ls.acquire(block, shared=0))
+
+      def _AcquireOne():
+        # Try to get the same lock again with a timeout (should never succeed)
+        if self.ls.acquire(wanted, timeout=0.1, shared=0):
+          self.done.put("acquired")
+          self.ls.release()
+        else:
+          self.assert_(not self.ls._list_owned())
+          self.assert_(not self.ls._is_owned())
+          self.done.put("not acquired")
+
+      self._addThread(target=_AcquireOne)
+
+      # Wait for timeout in thread to expire
+      self._waitThreads()
+
+      # Release exclusive lock again
+      self.ls.release()
+
+      self.assertEqual(self.done.get_nowait(), "not acquired")
+      self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  @_Repeat
+  def testDelayedAndExpiringLockAcquire(self):
+    self._setUpLS()
+    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
+
+    for expire in (False, True):
+      names = sorted(self.ls._names())
+      self.assertEqual(len(names), 8)
+
+      lock_ev = dict([(i, threading.Event()) for i in names])
+
+      # Lock all in exclusive mode
+      self.assert_(self.ls.acquire(names, shared=0))
+
+      if expire:
+        # We'll wait at least 300ms per lock
+        lockwait = len(names) * [0.3]
+
+        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
+        # this gives us up to 2.4s to fail.
+        lockall_timeout = 0.4
+      else:
+        # This should finish rather quickly
+        lockwait = None
+        lockall_timeout = len(names) * 5.0
+
+      def _LockAll():
+        def acquire_notification(name):
+          if not expire:
+            self.done.put("getting %s" % name)
+
+          # Kick next lock
+          lock_ev[name].set()
+
+        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
+                           test_notify=acquire_notification):
+          self.done.put("got all")
+          self.ls.release()
+        else:
+          self.done.put("timeout on all")
+
+        # Notify all locks
+        for ev in lock_ev.values():
+          ev.set()
+
+      t = self._addThread(target=_LockAll)
+
+      for idx, name in enumerate(names):
+        # Wait for actual acquire on this lock to start
+        lock_ev[name].wait(10.0)
+
+        if expire and t.isAlive():
+          # Wait some time after getting the notification to make sure the lock
+          # acquire will expire
+          SafeSleep(lockwait[idx])
+
+        self.ls.release(names=name)
+
+      self.assert_(not self.ls._list_owned())
+
+      self._waitThreads()
+
+      if expire:
+        # Not checking which locks were actually acquired. Doing so would be
+        # too timing-dependant.
+        self.assertEqual(self.done.get_nowait(), "timeout on all")
+      else:
+        for i in names:
+          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
+        self.assertEqual(self.done.get_nowait(), "got all")
+      self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  @_Repeat
   def testConcurrentRemove(self):
     self.ls.add('four')
     self.ls.acquire(['one', 'two', 'four'])