Add the delete() operation to SharedLock
authorGuido Trotter <ultrotter@google.com>
Tue, 19 Feb 2008 13:50:57 +0000 (13:50 +0000)
committerGuido Trotter <ultrotter@google.com>
Tue, 19 Feb 2008 13:50:57 +0000 (13:50 +0000)
This new operation lets a lock be cleanly deleted. The lock will be exclusively
held before deletion, and after it pending and future acquires will raise an
exception. Other SharedLock operations are modify to deal with delete() and to
avoid code duplication.

This patch also adds unit testing for the new function and its interaction with
the other lock features. The helper threads are sligtly modified to handle and
report the condition of a deleted lock. As a bonus a non-related unit test
about not supporting non-blocking mode yet has been added as well.

This feature will be used by the LockSet in order to support deadlock-free
delete of resources. This in turn will be useful to gracefully handle the
removal of instances and nodes from the cluster dealing with the fact that
other operations may be pending on them.

Reviewed-by: iustinp

lib/locking.py
test/ganeti.locking_unittest.py

index 440eecf..b164ff3 100644 (file)
@@ -23,6 +23,9 @@
 # pylint: disable-msg=W0613,W0201
 
 import threading
+# Wouldn't it be better to define LockingError in the locking module?
+# Well, for now that's how the rest of the code does it...
+from ganeti import errors
 
 
 class SharedLock:
@@ -52,6 +55,9 @@ class SharedLock:
     self.__nwait_exc = 0
     self.__nwait_shr = 0
 
+    # is this lock in the deleted state?
+    self.__deleted = False
+
   def __is_sharer(self):
     """Is the current thread sharing the lock at this time?"""
     return threading.currentThread() in self.__shr
@@ -92,6 +98,41 @@ class SharedLock:
 
     return result
 
+  def __wait(self,c):
+    """Wait on the given condition, and raise an exception if the current lock
+    is declared deleted in the meantime.
+
+    Args:
+      c: condition to wait on
+
+    """
+    c.wait()
+    if self.__deleted:
+      raise errors.LockError('deleted lock')
+
+  def __exclusive_acquire(self):
+    """Acquire the lock exclusively.
+
+    This is a private function that presumes you are already holding the
+    internal lock. It's defined separately to avoid code duplication between
+    acquire() and delete()
+
+    """
+    self.__nwait_exc += 1
+    try:
+      # This is to save ourselves from a nasty race condition that could
+      # theoretically make the sharers starve.
+      if self.__nwait_shr > 0 or self.__nwait_exc > 1:
+        self.__wait(self.__turn_exc)
+
+      while len(self.__shr) > 0 or self.__exc is not None:
+        self.__wait(self.__turn_exc)
+
+      self.__exc = threading.currentThread()
+    finally:
+      self.__nwait_exc -= 1
+
+
   def acquire(self, blocking=1, shared=0):
     """Acquire a shared lock.
 
@@ -108,6 +149,9 @@ class SharedLock:
 
     self.__lock.acquire()
     try:
+      if self.__deleted:
+        raise errors.LockError('deleted lock')
+
       # We cannot acquire the lock if we already have it
       assert not self.__is_owned(), "double acquire() on a non-recursive lock"
 
@@ -119,32 +163,20 @@ class SharedLock:
           # we'll just wait while there are no exclusive holders.
           if self.__nwait_exc > 0:
             # TODO: if !blocking...
-            self.__turn_shr.wait()
+            self.__wait(self.__turn_shr)
 
           while self.__exc is not None:
             # TODO: if !blocking...
-            self.__turn_shr.wait()
+            self.__wait(self.__turn_shr)
 
           self.__shr.add(threading.currentThread())
         finally:
           self.__nwait_shr -= 1
 
       else:
-        self.__nwait_exc += 1
-        try:
-          # This is to save ourselves from a nasty race condition that could
-          # theoretically make the sharers starve.
-          if self.__nwait_shr > 0 or self.__nwait_exc > 1:
-            # TODO: if !blocking...
-              self.__turn_exc.wait()
-
-          while len(self.__shr) > 0 or self.__exc is not None:
-            # TODO: if !blocking...
-            self.__turn_exc.wait()
-
-          self.__exc = threading.currentThread()
-        finally:
-          self.__nwait_exc -= 1
+        # TODO: if !blocking...
+        # (or modify __exclusive_acquire for non-blocking mode)
+        self.__exclusive_acquire()
 
     finally:
       self.__lock.release()
@@ -191,3 +223,39 @@ class SharedLock:
     finally:
       self.__lock.release()
 
+  def delete(self, blocking=1):
+    """Delete a Shared Lock.
+
+    This operation will declare the lock for removal. First the lock will be
+    acquired in exclusive mode if you don't already own it, then the lock
+    will be put in a state where any future and pending acquire() fail.
+
+    Args:
+      blocking: whether to block while trying to acquire or to operate in
+                try-lock mode.  this locking mode is not supported yet unless
+                you are already holding exclusively the lock.
+
+    """
+    self.__lock.acquire()
+    try:
+      assert not self.__is_sharer(), "cannot delete() a lock while sharing it"
+
+      if self.__deleted:
+        raise errors.LockError('deleted lock')
+
+      if not self.__is_exclusive():
+        if not blocking:
+          # We don't have non-blocking mode for now
+          raise NotImplementedError
+        self.__exclusive_acquire()
+
+      self.__deleted = True
+      self.__exc = None
+      # Wake up everybody, they will fail acquiring the lock and
+      # raise an exception instead.
+      self.__turn_exc.notifyAll()
+      self.__turn_shr.notifyAll()
+
+    finally:
+      self.__lock.release()
+
index 4ed355e..8947093 100755 (executable)
@@ -28,6 +28,7 @@ import time
 import Queue
 
 from ganeti import locking
+from ganeti import errors
 from threading import Thread
 
 
@@ -86,14 +87,28 @@ class TestSharedLock(unittest.TestCase):
   # helper functions: called in a separate thread they acquire the lock, send
   # their identifier on the done queue, then release it.
   def _doItSharer(self):
-    self.sl.acquire(shared=1)
-    self.done.put('SHR')
-    self.sl.release()
+    try:
+      self.sl.acquire(shared=1)
+      self.done.put('SHR')
+      self.sl.release()
+    except errors.LockError:
+      self.done.put('ERR')
 
   def _doItExclusive(self):
-    self.sl.acquire()
-    self.done.put('EXC')
-    self.sl.release()
+    try:
+      self.sl.acquire()
+      self.done.put('EXC')
+      self.sl.release()
+    except errors.LockError:
+      self.done.put('ERR')
+
+  def _doItDelete(self):
+    try:
+      self.sl.acquire()
+      self.done.put('DEL')
+      self.sl.release()
+    except errors.LockError:
+      self.done.put('ERR')
 
   def testSharersCanCoexist(self):
     self.sl.acquire(shared=1)
@@ -109,6 +124,14 @@ class TestSharedLock(unittest.TestCase):
     self.sl.release()
     self.assert_(self.done.get(True, 1))
 
+  def testExclusiveBlocksDelete(self):
+    self.sl.acquire()
+    Thread(target=self._doItDelete).start()
+    # give it a bit of time to check that it's not actually doing anything
+    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.sl.release()
+    self.assert_(self.done.get(True, 1))
+
   def testExclusiveBlocksSharer(self):
     self.sl.acquire()
     Thread(target=self._doItSharer).start()
@@ -125,6 +148,14 @@ class TestSharedLock(unittest.TestCase):
     self.sl.release()
     self.assert_(self.done.get(True, 1))
 
+  def testSharerBlocksDelete(self):
+    self.sl.acquire(shared=1)
+    Thread(target=self._doItDelete).start()
+    time.sleep(0.05)
+    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.sl.release()
+    self.assert_(self.done.get(True, 1))
+
   def testWaitingExclusiveBlocksSharer(self):
     self.sl.acquire(shared=1)
     # the lock is acquired in shared mode...
@@ -153,6 +184,47 @@ class TestSharedLock(unittest.TestCase):
     self.assertEqual(self.done.get(True, 1), 'SHR')
     self.assertEqual(self.done.get(True, 1), 'EXC')
 
+  def testNoNonBlocking(self):
+    self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
+    self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
+    self.sl.acquire()
+    self.sl.delete(blocking=0) # Fine, because the lock is already acquired
+
+  def testDelete(self):
+    self.sl.delete()
+    self.assertRaises(errors.LockError, self.sl.acquire)
+    self.assertRaises(errors.LockError, self.sl.delete)
+
+  def testDeletePendingSharersExclusiveDelete(self):
+    self.sl.acquire()
+    Thread(target=self._doItSharer).start()
+    Thread(target=self._doItSharer).start()
+    time.sleep(0.05)
+    Thread(target=self._doItExclusive).start()
+    Thread(target=self._doItDelete).start()
+    time.sleep(0.05)
+    self.sl.delete()
+    # The two threads who were pending return both ERR
+    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self.assertEqual(self.done.get(True, 1), 'ERR')
+
+  def testDeletePendingDeleteExclusiveSharers(self):
+    self.sl.acquire()
+    Thread(target=self._doItDelete).start()
+    Thread(target=self._doItExclusive).start()
+    time.sleep(0.05)
+    Thread(target=self._doItSharer).start()
+    Thread(target=self._doItSharer).start()
+    time.sleep(0.05)
+    self.sl.delete()
+    # The two threads who were pending return both ERR
+    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self.assertEqual(self.done.get(True, 1), 'ERR')
+
 
 if __name__ == '__main__':
   unittest.main()