Convert “gnt-debug locks” to query2
[ganeti-local] / test / ganeti.locking_unittest.py
index 170e819..3364c88 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -27,16 +27,23 @@ import unittest
 import time
 import Queue
 import threading
+import random
+import itertools
 
+from ganeti import constants
 from ganeti import locking
 from ganeti import errors
+from ganeti import utils
+from ganeti import compat
+from ganeti import objects
+from ganeti import query
 
 import testutils
 
 
 # This is used to test the ssynchronize decorator.
 # Since it's passed as input to a decorator it must be declared as a global.
-_decoratorlock = locking.SharedLock()
+_decoratorlock = locking.SharedLock("decorator lock")
 
 #: List for looping tests
 ITERATIONS = range(8)
@@ -90,7 +97,7 @@ class _ConditionTestCase(_ThreadedTestCase):
     self.cond = cls(self.lock)
 
   def _testAcquireRelease(self):
-    self.assert_(not self.cond._is_owned())
+    self.assertFalse(self.cond._is_owned())
     self.assertRaises(RuntimeError, self.cond.wait)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
@@ -100,7 +107,7 @@ class _ConditionTestCase(_ThreadedTestCase):
     self.assert_(self.cond._is_owned())
     self.cond.release()
 
-    self.assert_(not self.cond._is_owned())
+    self.assertFalse(self.cond._is_owned())
     self.assertRaises(RuntimeError, self.cond.wait)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
@@ -122,7 +129,7 @@ class _ConditionTestCase(_ThreadedTestCase):
     self.assertEqual(self.done.get(True, 1), "NN")
     self.assert_(self.cond._is_owned())
     self.cond.release()
-    self.assert_(not self.cond._is_owned())
+    self.assertFalse(self.cond._is_owned())
 
 
 class TestSingleNotifyPipeCondition(_ConditionTestCase):
@@ -164,20 +171,22 @@ class TestPipeCondition(_ConditionTestCase):
     self._testNotification()
 
   def _TestWait(self, fn):
-    self._addThread(target=fn)
-    self._addThread(target=fn)
-    self._addThread(target=fn)
+    threads = [
+      self._addThread(target=fn),
+      self._addThread(target=fn),
+      self._addThread(target=fn),
+      ]
 
     # Wait for threads to be waiting
-    self.assertEqual(self.done.get(True, 1), "A")
-    self.assertEqual(self.done.get(True, 1), "A")
-    self.assertEqual(self.done.get(True, 1), "A")
+    for _ in threads:
+      self.assertEqual(self.done.get(True, 1), "A")
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
     self.cond.acquire()
-    self.assertEqual(self.cond._nwaiters, 3)
-    # This new thread can"t acquire the lock, and thus call wait, before we
+    self.assertEqual(len(self.cond._waiters), 3)
+    self.assertEqual(self.cond._waiters, set(threads))
+    # This new thread can't acquire the lock, and thus call wait, before we
     # release it
     self._addThread(target=fn)
     self.cond.notifyAll()
@@ -256,28 +265,28 @@ class TestSharedLock(_ThreadedTestCase):
 
   def setUp(self):
     _ThreadedTestCase.setUp(self)
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock("TestSharedLock")
 
   def testSequenceAndOwnership(self):
-    self.assert_(not self.sl._is_owned())
+    self.assertFalse(self.sl._is_owned())
     self.sl.acquire(shared=1)
     self.assert_(self.sl._is_owned())
     self.assert_(self.sl._is_owned(shared=1))
-    self.assert_(not self.sl._is_owned(shared=0))
+    self.assertFalse(self.sl._is_owned(shared=0))
     self.sl.release()
-    self.assert_(not self.sl._is_owned())
+    self.assertFalse(self.sl._is_owned())
     self.sl.acquire()
     self.assert_(self.sl._is_owned())
-    self.assert_(not self.sl._is_owned(shared=1))
+    self.assertFalse(self.sl._is_owned(shared=1))
     self.assert_(self.sl._is_owned(shared=0))
     self.sl.release()
-    self.assert_(not self.sl._is_owned())
+    self.assertFalse(self.sl._is_owned())
     self.sl.acquire(shared=1)
     self.assert_(self.sl._is_owned())
     self.assert_(self.sl._is_owned(shared=1))
-    self.assert_(not self.sl._is_owned(shared=0))
+    self.assertFalse(self.sl._is_owned(shared=0))
     self.sl.release()
-    self.assert_(not self.sl._is_owned())
+    self.assertFalse(self.sl._is_owned())
 
   def testBooleanValue(self):
     # semaphores are supposed to return a true value on a successful acquire
@@ -350,7 +359,7 @@ class TestSharedLock(_ThreadedTestCase):
     self.sl.release()
     self._waitThreads()
     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock(self.sl.name)
 
   @_Repeat
   def testExclusiveBlocksSharer(self):
@@ -378,7 +387,7 @@ class TestSharedLock(_ThreadedTestCase):
     self.sl.release()
     self._waitThreads()
     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock(self.sl.name)
 
   @_Repeat
   def testWaitingExclusiveBlocksSharer(self):
@@ -441,7 +450,7 @@ class TestSharedLock(_ThreadedTestCase):
     # The threads who were pending return ERR
     for _ in range(4):
       self.assertEqual(self.done.get_nowait(), 'ERR')
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock(self.sl.name)
 
   @_Repeat
   def testDeletePendingDeleteExclusiveSharers(self):
@@ -457,7 +466,7 @@ class TestSharedLock(_ThreadedTestCase):
     self.assertEqual(self.done.get_nowait(), 'ERR')
     self.assertEqual(self.done.get_nowait(), 'ERR')
     self.assertEqual(self.done.get_nowait(), 'ERR')
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock(self.sl.name)
 
   @_Repeat
   def testExclusiveAcquireTimeout(self):
@@ -697,6 +706,146 @@ class TestSharedLock(_ThreadedTestCase):
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
+  def testPriority(self):
+    # Acquire in exclusive mode
+    self.assert_(self.sl.acquire(shared=0))
+
+    # Queue acquires
+    def _Acquire(prev, next, shared, priority, result):
+      prev.wait()
+      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
+      try:
+        self.done.put(result)
+      finally:
+        self.sl.release()
+
+    counter = itertools.count(0)
+    priorities = range(-20, 30)
+    first = threading.Event()
+    prev = first
+
+    # Data structure:
+    # {
+    #   priority:
+    #     [(shared/exclusive, set(acquire names), set(pending threads)),
+    #      (shared/exclusive, ...),
+    #      ...,
+    #     ],
+    # }
+    perprio = {}
+
+    # References shared acquire per priority in L{perprio}. Data structure:
+    # {
+    #   priority: (shared=1, set(acquire names), set(pending threads)),
+    # }
+    prioshared = {}
+
+    for seed in [4979, 9523, 14902, 32440]:
+      # Use a deterministic random generator
+      rnd = random.Random(seed)
+      for priority in [rnd.choice(priorities) for _ in range(30)]:
+        modes = [0, 1]
+        rnd.shuffle(modes)
+        for shared in modes:
+          # Unique name
+          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
+
+          ev = threading.Event()
+          thread = self._addThread(target=_Acquire,
+                                   args=(prev, ev, shared, priority, acqname))
+          prev = ev
+
+          # Record expected aqcuire, see above for structure
+          data = (shared, set([acqname]), set([thread]))
+          priolist = perprio.setdefault(priority, [])
+          if shared:
+            priosh = prioshared.get(priority, None)
+            if priosh:
+              # Shared acquires are merged
+              for i, j in zip(priosh[1:], data[1:]):
+                i.update(j)
+              assert data[0] == priosh[0]
+            else:
+              prioshared[priority] = data
+              priolist.append(data)
+          else:
+            priolist.append(data)
+
+    # Start all acquires and wait for them
+    first.set()
+    prev.wait()
+
+    # Check lock information
+    self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
+    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+                     (self.sl.name, "exclusive",
+                      [threading.currentThread().getName()], None))
+
+    self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
+
+    # Let threads acquire the lock
+    self.sl.release()
+
+    # Wait for everything to finish
+    self._waitThreads()
+
+    self.assert_(self.sl._check_empty())
+
+    # Check acquires by priority
+    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
+      for (_, names, _) in acquires:
+        # For shared acquires, the set will contain 1..n entries. For exclusive
+        # acquires only one.
+        while names:
+          names.remove(self.done.get_nowait())
+      self.assertFalse(compat.any(names for (_, names, _) in acquires))
+
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
+    self.assertEqual(name, self.sl.name)
+    self.assert_(mode is None)
+    self.assert_(owner is None)
+
+    self.assertEqual([(pendmode, sorted(waiting))
+                      for (pendmode, waiting) in pending],
+                     [(["exclusive", "shared"][int(bool(shared))],
+                       sorted(t.getName() for t in threads))
+                      for acquires in [perprio[i]
+                                       for i in sorted(perprio.keys())]
+                      for (shared, _, threads) in acquires])
+
+
+class TestSharedLockInCondition(_ThreadedTestCase):
+  """SharedLock as a condition lock tests"""
+
+  def setUp(self):
+    _ThreadedTestCase.setUp(self)
+    self.sl = locking.SharedLock("TestSharedLockInCondition")
+    self.setCondition()
+
+  def setCondition(self):
+    self.cond = threading.Condition(self.sl)
+
+  def testKeepMode(self):
+    self.cond.acquire(shared=1)
+    self.assert_(self.sl._is_owned(shared=1))
+    self.cond.wait(0)
+    self.assert_(self.sl._is_owned(shared=1))
+    self.cond.release()
+    self.cond.acquire(shared=0)
+    self.assert_(self.sl._is_owned(shared=0))
+    self.cond.wait(0)
+    self.assert_(self.sl._is_owned(shared=0))
+    self.cond.release()
+
+
+class TestSharedLockInPipeCondition(TestSharedLockInCondition):
+  """SharedLock as a pipe condition lock tests"""
+
+  def setCondition(self):
+    self.cond = locking.PipeCondition(self.sl)
+
 
 class TestSSynchronizedDecorator(_ThreadedTestCase):
   """Shared Lock Synchronized decorator test"""
@@ -716,9 +865,9 @@ class TestSSynchronizedDecorator(_ThreadedTestCase):
 
   def testDecoratedFunctions(self):
     self._doItExclusive()
-    self.assert_(not _decoratorlock._is_owned())
+    self.assertFalse(_decoratorlock._is_owned())
     self._doItSharer()
-    self.assert_(not _decoratorlock._is_owned())
+    self.assertFalse(_decoratorlock._is_owned())
 
   def testSharersCanCoexist(self):
     _decoratorlock.acquire(shared=1)
@@ -765,11 +914,11 @@ class TestLockSet(_ThreadedTestCase):
   def _setUpLS(self):
     """Helper to (re)initialize the lock set"""
     self.resources = ['one', 'two', 'three']
-    self.ls = locking.LockSet(members=self.resources)
+    self.ls = locking.LockSet(self.resources, "TestLockSet")
 
   def testResources(self):
     self.assertEquals(self.ls._names(), set(self.resources))
-    newls = locking.LockSet()
+    newls = locking.LockSet([], "TestLockSet.testResources")
     self.assertEquals(newls._names(), set())
 
   def testAcquireRelease(self):
@@ -995,8 +1144,8 @@ class TestLockSet(_ThreadedTestCase):
           self.ls.release()
         else:
           self.assert_(acquired is None)
-          self.assert_(not self.ls._list_owned())
-          self.assert_(not self.ls._is_owned())
+          self.assertFalse(self.ls._list_owned())
+          self.assertFalse(self.ls._is_owned())
           self.done.put("not acquired")
 
       self._addThread(target=_AcquireOne)
@@ -1068,7 +1217,7 @@ class TestLockSet(_ThreadedTestCase):
 
         self.ls.release(names=name)
 
-      self.assert_(not self.ls._list_owned())
+      self.assertFalse(self.ls._list_owned())
 
       self._waitThreads()
 
@@ -1224,15 +1373,67 @@ class TestLockSet(_ThreadedTestCase):
     self.assertEqual(self.done.get_nowait(), 'DONE')
     self._setUpLS()
 
+  def testPriority(self):
+    def _Acquire(prev, next, name, priority, success_fn):
+      prev.wait()
+      self.assert_(self.ls.acquire(name, shared=0,
+                                   priority=priority,
+                                   test_notify=lambda _: next.set()))
+      try:
+        success_fn()
+      finally:
+        self.ls.release()
+
+    # Get all in exclusive mode
+    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
+
+    done_two = Queue.Queue(0)
+
+    first = threading.Event()
+    prev = first
+
+    acquires = [("one", prio, self.done) for prio in range(1, 33)]
+    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
+
+    # Use a deterministic random generator
+    random.Random(741).shuffle(acquires)
+
+    for (name, prio, done) in acquires:
+      ev = threading.Event()
+      self._addThread(target=_Acquire,
+                      args=(prev, ev, name, prio,
+                            compat.partial(done.put, "Prio%s" % prio)))
+      prev = ev
+
+    # Start acquires
+    first.set()
+
+    # Wait for last acquire to start
+    prev.wait()
+
+    # Let threads acquire locks
+    self.ls.release()
+
+    # Wait for threads to finish
+    self._waitThreads()
+
+    for i in range(1, 33):
+      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
+      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
+
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+    self.assertRaises(Queue.Empty, done_two.get_nowait)
+
 
 class TestGanetiLockManager(_ThreadedTestCase):
 
   def setUp(self):
     _ThreadedTestCase.setUp(self)
     self.nodes=['n1', 'n2']
+    self.nodegroups=['g1', 'g2']
     self.instances=['i1', 'i2', 'i3']
-    self.GL = locking.GanetiLockManager(nodes=self.nodes,
-                                        instances=self.instances)
+    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
+                                        self.instances)
 
   def tearDown(self):
     # Don't try this at home...
@@ -1247,31 +1448,37 @@ class TestGanetiLockManager(_ThreadedTestCase):
       self.assertEqual(i, locking.LEVELS[i])
 
   def testDoubleGLFails(self):
-    self.assertRaises(AssertionError, locking.GanetiLockManager)
+    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
 
   def testLockNames(self):
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
+                     set(self.nodegroups))
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
                      set(self.instances))
 
   def testInitAndResources(self):
     locking.GanetiLockManager._instance = None
-    self.GL = locking.GanetiLockManager()
+    self.GL = locking.GanetiLockManager([], [], [])
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
 
     locking.GanetiLockManager._instance = None
-    self.GL = locking.GanetiLockManager(nodes=self.nodes)
+    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
+                                    set(self.nodegroups))
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
 
     locking.GanetiLockManager._instance = None
-    self.GL = locking.GanetiLockManager(instances=self.instances)
+    self.GL = locking.GanetiLockManager([], [], self.instances)
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
                      set(self.instances))
 
@@ -1279,13 +1486,17 @@ class TestGanetiLockManager(_ThreadedTestCase):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
+    self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
     self.GL.release(locking.LEVEL_NODE, ['n2'])
     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
+    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
     self.GL.release(locking.LEVEL_NODE)
     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
+    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+    self.GL.release(locking.LEVEL_NODEGROUP)
     self.GL.release(locking.LEVEL_INSTANCE)
     self.assertRaises(errors.LockError, self.GL.acquire,
                       locking.LEVEL_INSTANCE, ['i5'])
@@ -1298,11 +1509,16 @@ class TestGanetiLockManager(_ThreadedTestCase):
                       set(self.instances))
     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
                       set(self.instances))
+    self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
+                      set(self.nodegroups))
+    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
+                      set(self.nodegroups))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
                       set(self.nodes))
     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
                       set(self.nodes))
     self.GL.release(locking.LEVEL_NODE)
+    self.GL.release(locking.LEVEL_NODEGROUP)
     self.GL.release(locking.LEVEL_INSTANCE)
     self.GL.release(locking.LEVEL_CLUSTER)
 
@@ -1325,6 +1541,8 @@ class TestGanetiLockManager(_ThreadedTestCase):
                       locking.LEVEL_NODE, ['n1', 'n2'])
     self.assertRaises(AssertionError, self.GL.acquire,
                       locking.LEVEL_INSTANCE, ['i3'])
+    self.assertRaises(AssertionError, self.GL.acquire,
+                      locking.LEVEL_NODEGROUP, ['g1'])
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
     self.GL.acquire(locking.LEVEL_NODE, ['n1'])
     self.assertRaises(AssertionError, self.GL.release,
@@ -1338,6 +1556,14 @@ class TestGanetiLockManager(_ThreadedTestCase):
     self.assertRaises(AssertionError, self.GL.release,
                       locking.LEVEL_CLUSTER)
     self.GL.release(locking.LEVEL_INSTANCE)
+    self.GL.acquire(locking.LEVEL_NODEGROUP, None)
+    self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
+    self.assertRaises(AssertionError, self.GL.release,
+                      locking.LEVEL_CLUSTER, ['BGL'])
+    self.assertRaises(AssertionError, self.GL.release,
+                      locking.LEVEL_CLUSTER)
+    self.GL.release(locking.LEVEL_NODEGROUP)
+    self.GL.release(locking.LEVEL_CLUSTER)
 
   def testWrongOrder(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
@@ -1345,8 +1571,28 @@ class TestGanetiLockManager(_ThreadedTestCase):
     self.assertRaises(AssertionError, self.GL.acquire,
                       locking.LEVEL_NODE, ['n1'])
     self.assertRaises(AssertionError, self.GL.acquire,
+                      locking.LEVEL_NODEGROUP, ['g1'])
+    self.assertRaises(AssertionError, self.GL.acquire,
                       locking.LEVEL_INSTANCE, ['i2'])
 
+  def testModifiableLevels(self):
+    self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
+                      ['BGL2'])
+    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
+    self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
+    self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
+    self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
+    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
+    self.GL.add(locking.LEVEL_NODE, ['n3'])
+    self.GL.remove(locking.LEVEL_NODE, ['n1'])
+    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
+    self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
+    self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
+    self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
+    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
+    self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
+                      ['BGL2'])
+
   # Helper function to run as a thread that shared the BGL and then acquires
   # some locks at another level.
   def _doLock(self, level, names, shared):
@@ -1391,5 +1637,283 @@ class TestGanetiLockManager(_ThreadedTestCase):
     self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
 
 
+class TestLockMonitor(_ThreadedTestCase):
+  def setUp(self):
+    _ThreadedTestCase.setUp(self)
+    self.lm = locking.LockMonitor()
+
+  def testSingleThread(self):
+    locks = []
+
+    for i in range(100):
+      name = "TestLock%s" % i
+      locks.append(locking.SharedLock(name, monitor=self.lm))
+
+    self.assertEqual(len(self.lm._locks), len(locks))
+    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
+    self.assertEqual(len(result.fields), 1)
+    self.assertEqual(len(result.data), 100)
+
+    # Delete all locks
+    del locks[:]
+
+    # The garbage collector might needs some time
+    def _CheckLocks():
+      if self.lm._locks:
+        raise utils.RetryAgain()
+
+    utils.Retry(_CheckLocks, 0.1, 30.0)
+
+    self.assertFalse(self.lm._locks)
+
+  def testMultiThread(self):
+    locks = []
+
+    def _CreateLock(prev, next, name):
+      prev.wait()
+      locks.append(locking.SharedLock(name, monitor=self.lm))
+      if next:
+        next.set()
+
+    expnames = []
+
+    first = threading.Event()
+    prev = first
+
+    # Use a deterministic random generator
+    for i in random.Random(4263).sample(range(100), 33):
+      name = "MtTestLock%s" % i
+      expnames.append(name)
+
+      ev = threading.Event()
+      self._addThread(target=_CreateLock, args=(prev, ev, name))
+      prev = ev
+
+    # Add locks
+    first.set()
+    self._waitThreads()
+
+    # Check order in which locks were added
+    self.assertEqual([i.name for i in locks], expnames)
+
+    # Check query result
+    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+    self.assert_(isinstance(result, dict))
+    response = objects.QueryResponse.FromDict(result)
+    self.assertEqual(response.data,
+                     [[(constants.QRFS_NORMAL, name),
+                       (constants.QRFS_NORMAL, None),
+                       (constants.QRFS_NORMAL, None),
+                       (constants.QRFS_NORMAL, [])]
+                      for name in utils.NiceSort(expnames)])
+    self.assertEqual(len(response.fields), 4)
+    self.assertEqual(["name", "mode", "owner", "pending"],
+                     [fdef.name for fdef in response.fields])
+
+    # Test exclusive acquire
+    for tlock in locks[::4]:
+      tlock.acquire(shared=0)
+      try:
+        def _GetExpResult(name):
+          if tlock.name == name:
+            return [(constants.QRFS_NORMAL, name),
+                    (constants.QRFS_NORMAL, "exclusive"),
+                    (constants.QRFS_NORMAL,
+                     [threading.currentThread().getName()]),
+                    (constants.QRFS_NORMAL, [])]
+          return [(constants.QRFS_NORMAL, name),
+                  (constants.QRFS_NORMAL, None),
+                  (constants.QRFS_NORMAL, None),
+                  (constants.QRFS_NORMAL, [])]
+
+        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+        self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                         [_GetExpResult(name)
+                          for name in utils.NiceSort(expnames)])
+      finally:
+        tlock.release()
+
+    # Test shared acquire
+    def _Acquire(lock, shared, ev, notify):
+      lock.acquire(shared=shared)
+      try:
+        notify.set()
+        ev.wait()
+      finally:
+        lock.release()
+
+    for tlock1 in locks[::11]:
+      for tlock2 in locks[::-15]:
+        if tlock2 == tlock1:
+          # Avoid deadlocks
+          continue
+
+        for tlock3 in locks[::10]:
+          if tlock3 in (tlock2, tlock1):
+            # Avoid deadlocks
+            continue
+
+          releaseev = threading.Event()
+
+          # Acquire locks
+          acquireev = []
+          tthreads1 = []
+          for i in range(3):
+            ev = threading.Event()
+            tthreads1.append(self._addThread(target=_Acquire,
+                                             args=(tlock1, 1, releaseev, ev)))
+            acquireev.append(ev)
+
+          ev = threading.Event()
+          tthread2 = self._addThread(target=_Acquire,
+                                     args=(tlock2, 1, releaseev, ev))
+          acquireev.append(ev)
+
+          ev = threading.Event()
+          tthread3 = self._addThread(target=_Acquire,
+                                     args=(tlock3, 0, releaseev, ev))
+          acquireev.append(ev)
+
+          # Wait for all locks to be acquired
+          for i in acquireev:
+            i.wait()
+
+          # Check query result
+          result = self.lm.QueryLocks(["name", "mode", "owner"])
+          response = objects.QueryResponse.FromDict(result)
+          for (name, mode, owner) in response.data:
+            (name_status, name_value) = name
+            (owner_status, owner_value) = owner
+
+            self.assertEqual(name_status, constants.QRFS_NORMAL)
+            self.assertEqual(owner_status, constants.QRFS_NORMAL)
+
+            if name_value == tlock1.name:
+              self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
+              self.assertEqual(set(owner_value),
+                               set(i.getName() for i in tthreads1))
+              continue
+
+            if name_value == tlock2.name:
+              self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
+              self.assertEqual(owner_value, [tthread2.getName()])
+              continue
+
+            if name_value == tlock3.name:
+              self.assertEqual(mode, (constants.QRFS_NORMAL, "exclusive"))
+              self.assertEqual(owner_value, [tthread3.getName()])
+              continue
+
+            self.assert_(name_value in expnames)
+            self.assertEqual(mode, (constants.QRFS_NORMAL, None))
+            self.assert_(owner_value is None)
+
+          # Release locks again
+          releaseev.set()
+
+          self._waitThreads()
+
+          result = self.lm.QueryLocks(["name", "mode", "owner"])
+          self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                           [[(constants.QRFS_NORMAL, name),
+                             (constants.QRFS_NORMAL, None),
+                             (constants.QRFS_NORMAL, None)]
+                            for name in utils.NiceSort(expnames)])
+
+  def testDelete(self):
+    lock = locking.SharedLock("TestLock", monitor=self.lm)
+
+    self.assertEqual(len(self.lm._locks), 1)
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                     [[(constants.QRFS_NORMAL, lock.name),
+                       (constants.QRFS_NORMAL, None),
+                       (constants.QRFS_NORMAL, None)]])
+
+    lock.delete()
+
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                     [[(constants.QRFS_NORMAL, lock.name),
+                       (constants.QRFS_NORMAL, "deleted"),
+                       (constants.QRFS_NORMAL, None)]])
+    self.assertEqual(len(self.lm._locks), 1)
+
+  def testPending(self):
+    def _Acquire(lock, shared, prev, next):
+      prev.wait()
+
+      lock.acquire(shared=shared, test_notify=next.set)
+      try:
+        pass
+      finally:
+        lock.release()
+
+    lock = locking.SharedLock("ExcLock", monitor=self.lm)
+
+    for shared in [0, 1]:
+      lock.acquire()
+      try:
+        self.assertEqual(len(self.lm._locks), 1)
+        result = self.lm.QueryLocks(["name", "mode", "owner"])
+        self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                         [[(constants.QRFS_NORMAL, lock.name),
+                           (constants.QRFS_NORMAL, "exclusive"),
+                           (constants.QRFS_NORMAL,
+                            [threading.currentThread().getName()])]])
+
+        threads = []
+
+        first = threading.Event()
+        prev = first
+
+        for i in range(5):
+          ev = threading.Event()
+          threads.append(self._addThread(target=_Acquire,
+                                          args=(lock, shared, prev, ev)))
+          prev = ev
+
+        # Start acquires
+        first.set()
+
+        # Wait for last acquire to start waiting
+        prev.wait()
+
+        # NOTE: This works only because QueryLocks will acquire the
+        # lock-internal lock again and won't be able to get the information
+        # until it has the lock. By then the acquire should be registered in
+        # SharedLock.__pending (otherwise it's a bug).
+
+        # All acquires are waiting now
+        if shared:
+          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
+        else:
+          pending = [("exclusive", [t.getName()]) for t in threads]
+
+        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+        self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                         [[(constants.QRFS_NORMAL, lock.name),
+                           (constants.QRFS_NORMAL, "exclusive"),
+                           (constants.QRFS_NORMAL,
+                            [threading.currentThread().getName()]),
+                           (constants.QRFS_NORMAL, pending)]])
+
+        self.assertEqual(len(self.lm._locks), 1)
+      finally:
+        lock.release()
+
+      self._waitThreads()
+
+      # No pending acquires
+      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+      self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                       [[(constants.QRFS_NORMAL, lock.name),
+                         (constants.QRFS_NORMAL, None),
+                         (constants.QRFS_NORMAL, None),
+                         (constants.QRFS_NORMAL, [])]])
+
+      self.assertEqual(len(self.lm._locks), 1)
+
+
 if __name__ == '__main__':
   testutils.GanetiTestProgram()