Adding a runtime configuration library
[ganeti-local] / test / ganeti.locking_unittest.py
index c1ea313..273e817 100755 (executable)
@@ -166,20 +166,22 @@ class TestPipeCondition(_ConditionTestCase):
     self._testNotification()
 
   def _TestWait(self, fn):
-    self._addThread(target=fn)
-    self._addThread(target=fn)
-    self._addThread(target=fn)
+    threads = [
+      self._addThread(target=fn),
+      self._addThread(target=fn),
+      self._addThread(target=fn),
+      ]
 
     # Wait for threads to be waiting
-    self.assertEqual(self.done.get(True, 1), "A")
-    self.assertEqual(self.done.get(True, 1), "A")
-    self.assertEqual(self.done.get(True, 1), "A")
+    for _ in threads:
+      self.assertEqual(self.done.get(True, 1), "A")
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
     self.cond.acquire()
-    self.assertEqual(self.cond._nwaiters, 3)
-    # This new thread can"t acquire the lock, and thus call wait, before we
+    self.assertEqual(len(self.cond._waiters), 3)
+    self.assertEqual(self.cond._waiters, set(threads))
+    # This new thread can't acquire the lock, and thus call wait, before we
     # release it
     self._addThread(target=fn)
     self.cond.notifyAll()
@@ -1438,6 +1440,9 @@ class TestLockMonitor(_ThreadedTestCase):
 
     self.assertEqual(len(self.lm._locks), len(locks))
 
+    self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
+                     100)
+
     # Delete all locks
     del locks[:]
 
@@ -1484,8 +1489,10 @@ class TestLockMonitor(_ThreadedTestCase):
     self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
 
     # Check query result
-    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
-                     [[name, None, None] for name in utils.NiceSort(expnames)])
+    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
+                                        False),
+                     [[name, None, None, []]
+                      for name in utils.NiceSort(expnames)])
 
     # Test exclusive acquire
     for tlock in locks[::4]:
@@ -1493,10 +1500,12 @@ class TestLockMonitor(_ThreadedTestCase):
       try:
         def _GetExpResult(name):
           if tlock.name == name:
-            return [name, "exclusive", [threading.currentThread().getName()]]
-          return [name, None, None]
+            return [name, "exclusive", [threading.currentThread().getName()],
+                    []]
+          return [name, None, None, []]
 
-        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
+        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
+                                             "pending"], False),
                          [_GetExpResult(name)
                           for name in utils.NiceSort(expnames)])
       finally:
@@ -1591,6 +1600,73 @@ class TestLockMonitor(_ThreadedTestCase):
                      [[lock.name, "deleted", None]])
     self.assertEqual(len(self.lm._locks), 1)
 
+  def testPending(self):
+    def _Acquire(lock, shared, prev, next):
+      prev.wait()
+
+      lock.acquire(shared=shared, test_notify=next.set)
+      try:
+        pass
+      finally:
+        lock.release()
+
+    lock = locking.SharedLock("ExcLock", monitor=self.lm)
+
+    for shared in [0, 1]:
+      lock.acquire()
+      try:
+        self.assertEqual(len(self.lm._locks), 1)
+        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
+                         [[lock.name, "exclusive",
+                           [threading.currentThread().getName()]]])
+
+        threads = []
+
+        first = threading.Event()
+        prev = first
+
+        for i in range(5):
+          ev = threading.Event()
+          threads.append(self._addThread(target=_Acquire,
+                                          args=(lock, shared, prev, ev)))
+          prev = ev
+
+        # Start acquires
+        first.set()
+
+        # Wait for last acquire to start waiting
+        prev.wait()
+
+        # NOTE: This works only because QueryLocks will acquire the
+        # lock-internal lock again and won't be able to get the information
+        # until it has the lock. By then the acquire should be registered in
+        # SharedLock.__pending (otherwise it's a bug).
+
+        # All acquires are waiting now
+        if shared:
+          pending = [("shared", sorted([t.getName() for t in threads]))]
+        else:
+          pending = [("exclusive", [t.getName()]) for t in threads]
+
+        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
+                                             "pending"], False),
+                         [[lock.name, "exclusive",
+                           [threading.currentThread().getName()],
+                           pending]])
+
+        self.assertEqual(len(self.lm._locks), 1)
+      finally:
+        lock.release()
+
+      self._waitThreads()
+
+      # No pending acquires
+      self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
+                                          False),
+                       [[lock.name, None, None, []]])
+
+      self.assertEqual(len(self.lm._locks), 1)
+
 
 if __name__ == '__main__':
   testutils.GanetiTestProgram()