Add support for classic queries
[ganeti-local] / test / ganeti.locking_unittest.py
index 3284131..2706a72 100755 (executable)
@@ -16,7 +16,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # You should have received a copy of the GNU General Public License
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 0.0510-1301, USA.
+# 02110-1301, USA.
 
 
 """Script for unittesting the locking module"""
 
 
 """Script for unittesting the locking module"""
@@ -28,12 +28,16 @@ import time
 import Queue
 import threading
 import random
 import Queue
 import threading
 import random
+import gc
 import itertools
 
 import itertools
 
+from ganeti import constants
 from ganeti import locking
 from ganeti import errors
 from ganeti import utils
 from ganeti import compat
 from ganeti import locking
 from ganeti import errors
 from ganeti import utils
 from ganeti import compat
+from ganeti import objects
+from ganeti import query
 
 import testutils
 
 
 import testutils
 
@@ -95,7 +99,7 @@ class _ConditionTestCase(_ThreadedTestCase):
 
   def _testAcquireRelease(self):
     self.assertFalse(self.cond._is_owned())
 
   def _testAcquireRelease(self):
     self.assertFalse(self.cond._is_owned())
-    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.wait, None)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
     self.cond.acquire()
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
     self.cond.acquire()
@@ -105,7 +109,7 @@ class _ConditionTestCase(_ThreadedTestCase):
     self.cond.release()
 
     self.assertFalse(self.cond._is_owned())
     self.cond.release()
 
     self.assertFalse(self.cond._is_owned())
-    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.wait, None)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
   def _testNotification(self):
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
   def _testNotification(self):
@@ -121,7 +125,7 @@ class _ConditionTestCase(_ThreadedTestCase):
     self._addThread(target=_NotifyAll)
     self.assertEqual(self.done.get(True, 1), "NE")
     self.assertRaises(Queue.Empty, self.done.get_nowait)
     self._addThread(target=_NotifyAll)
     self.assertEqual(self.done.get(True, 1), "NE")
     self.assertRaises(Queue.Empty, self.done.get_nowait)
-    self.cond.wait()
+    self.cond.wait(None)
     self.assertEqual(self.done.get(True, 1), "NA")
     self.assertEqual(self.done.get(True, 1), "NN")
     self.assert_(self.cond._is_owned())
     self.assertEqual(self.done.get(True, 1), "NA")
     self.assertEqual(self.done.get(True, 1), "NN")
     self.assert_(self.cond._is_owned())
@@ -150,7 +154,7 @@ class TestSingleNotifyPipeCondition(_ConditionTestCase):
   def testNoNotifyReuse(self):
     self.cond.acquire()
     self.cond.notifyAll()
   def testNoNotifyReuse(self):
     self.cond.acquire()
     self.cond.notifyAll()
-    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.wait, None)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
     self.cond.release()
 
     self.assertRaises(RuntimeError, self.cond.notifyAll)
     self.cond.release()
 
@@ -216,7 +220,7 @@ class TestPipeCondition(_ConditionTestCase):
     def _BlockingWait():
       self.cond.acquire()
       self.done.put("A")
     def _BlockingWait():
       self.cond.acquire()
       self.done.put("A")
-      self.cond.wait()
+      self.cond.wait(None)
       self.cond.release()
       self.done.put("W")
 
       self.cond.release()
       self.done.put("W")
 
@@ -265,25 +269,25 @@ class TestSharedLock(_ThreadedTestCase):
     self.sl = locking.SharedLock("TestSharedLock")
 
   def testSequenceAndOwnership(self):
     self.sl = locking.SharedLock("TestSharedLock")
 
   def testSequenceAndOwnership(self):
-    self.assertFalse(self.sl._is_owned())
+    self.assertFalse(self.sl.is_owned())
     self.sl.acquire(shared=1)
     self.sl.acquire(shared=1)
-    self.assert_(self.sl._is_owned())
-    self.assert_(self.sl._is_owned(shared=1))
-    self.assertFalse(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned())
+    self.assert_(self.sl.is_owned(shared=1))
+    self.assertFalse(self.sl.is_owned(shared=0))
     self.sl.release()
     self.sl.release()
-    self.assertFalse(self.sl._is_owned())
+    self.assertFalse(self.sl.is_owned())
     self.sl.acquire()
     self.sl.acquire()
-    self.assert_(self.sl._is_owned())
-    self.assertFalse(self.sl._is_owned(shared=1))
-    self.assert_(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned())
+    self.assertFalse(self.sl.is_owned(shared=1))
+    self.assert_(self.sl.is_owned(shared=0))
     self.sl.release()
     self.sl.release()
-    self.assertFalse(self.sl._is_owned())
+    self.assertFalse(self.sl.is_owned())
     self.sl.acquire(shared=1)
     self.sl.acquire(shared=1)
-    self.assert_(self.sl._is_owned())
-    self.assert_(self.sl._is_owned(shared=1))
-    self.assertFalse(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned())
+    self.assert_(self.sl.is_owned(shared=1))
+    self.assertFalse(self.sl.is_owned(shared=0))
     self.sl.release()
     self.sl.release()
-    self.assertFalse(self.sl._is_owned())
+    self.assertFalse(self.sl.is_owned())
 
   def testBooleanValue(self):
     # semaphores are supposed to return a true value on a successful acquire
 
   def testBooleanValue(self):
     # semaphores are supposed to return a true value on a successful acquire
@@ -429,7 +433,31 @@ class TestSharedLock(_ThreadedTestCase):
     self.assertRaises(errors.LockError, self.sl.delete)
 
   def testDeleteTimeout(self):
     self.assertRaises(errors.LockError, self.sl.delete)
 
   def testDeleteTimeout(self):
-    self.sl.delete(timeout=60)
+    self.assertTrue(self.sl.delete(timeout=60))
+
+  def testDeleteTimeoutFail(self):
+    ready = threading.Event()
+    finish = threading.Event()
+
+    def fn():
+      self.sl.acquire(shared=0)
+      ready.set()
+
+      finish.wait()
+      self.sl.release()
+
+    self._addThread(target=fn)
+    ready.wait()
+
+    # Test if deleting a lock owned in exclusive mode by another thread fails
+    # to delete when a timeout is used
+    self.assertFalse(self.sl.delete(timeout=0.02))
+
+    finish.set()
+    self._waitThreads()
+
+    self.assertTrue(self.sl.delete())
+    self.assertRaises(errors.LockError, self.sl.acquire)
 
   def testNoDeleteIfSharer(self):
     self.sl.acquire(shared=1)
 
   def testNoDeleteIfSharer(self):
     self.sl.acquire(shared=1)
@@ -608,6 +636,117 @@ class TestSharedLock(_ThreadedTestCase):
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
+  def testIllegalDowngrade(self):
+    # Not yet acquired
+    self.assertRaises(AssertionError, self.sl.downgrade)
+
+    # Acquire in shared mode, downgrade should be no-op
+    self.assertTrue(self.sl.acquire(shared=1))
+    self.assertTrue(self.sl.is_owned(shared=1))
+    self.assertTrue(self.sl.downgrade())
+    self.assertTrue(self.sl.is_owned(shared=1))
+    self.sl.release()
+
+  def testDowngrade(self):
+    self.assertTrue(self.sl.acquire())
+    self.assertTrue(self.sl.is_owned(shared=0))
+    self.assertTrue(self.sl.downgrade())
+    self.assertTrue(self.sl.is_owned(shared=1))
+    self.sl.release()
+
+  @_Repeat
+  def testDowngradeJumpsAheadOfExclusive(self):
+    def _KeepExclusive(ev_got, ev_downgrade, ev_release):
+      self.assertTrue(self.sl.acquire())
+      self.assertTrue(self.sl.is_owned(shared=0))
+      ev_got.set()
+      ev_downgrade.wait()
+      self.assertTrue(self.sl.is_owned(shared=0))
+      self.assertTrue(self.sl.downgrade())
+      self.assertTrue(self.sl.is_owned(shared=1))
+      ev_release.wait()
+      self.assertTrue(self.sl.is_owned(shared=1))
+      self.sl.release()
+
+    def _KeepExclusive2(ev_started, ev_release):
+      self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
+      self.assertTrue(self.sl.is_owned(shared=0))
+      ev_release.wait()
+      self.assertTrue(self.sl.is_owned(shared=0))
+      self.sl.release()
+
+    def _KeepShared(ev_started, ev_got, ev_release):
+      self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
+      self.assertTrue(self.sl.is_owned(shared=1))
+      ev_got.set()
+      ev_release.wait()
+      self.assertTrue(self.sl.is_owned(shared=1))
+      self.sl.release()
+
+    # Acquire lock in exclusive mode
+    ev_got_excl1 = threading.Event()
+    ev_downgrade_excl1 = threading.Event()
+    ev_release_excl1 = threading.Event()
+    th_excl1 = self._addThread(target=_KeepExclusive,
+                               args=(ev_got_excl1, ev_downgrade_excl1,
+                                     ev_release_excl1))
+    ev_got_excl1.wait()
+
+    # Start a second exclusive acquire
+    ev_started_excl2 = threading.Event()
+    ev_release_excl2 = threading.Event()
+    th_excl2 = self._addThread(target=_KeepExclusive2,
+                               args=(ev_started_excl2, ev_release_excl2))
+    ev_started_excl2.wait()
+
+    # Start shared acquires, will jump ahead of second exclusive acquire when
+    # first exclusive acquire downgrades
+    ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
+    ev_release_shared = threading.Event()
+
+    th_shared = [self._addThread(target=_KeepShared,
+                                 args=(ev_started, ev_got, ev_release_shared))
+                 for (ev_started, ev_got) in ev_shared]
+
+    # Wait for all shared acquires to start
+    for (ev, _) in ev_shared:
+      ev.wait()
+
+    # Check lock information
+    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+                     [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
+    [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
+    self.assertEqual([(pendmode, sorted(waiting))
+                      for (pendmode, waiting) in pending],
+                     [("exclusive", [th_excl2.getName()]),
+                      ("shared", sorted(th.getName() for th in th_shared))])
+
+    # Shared acquires won't start until the exclusive lock is downgraded
+    ev_downgrade_excl1.set()
+
+    # Wait for all shared acquires to be successful
+    for (_, ev) in ev_shared:
+      ev.wait()
+
+    # Check lock information again
+    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
+                                              query.LQ_PENDING])),
+                     [(self.sl.name, "shared", None,
+                       [("exclusive", [th_excl2.getName()])])])
+    [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
+    self.assertEqual(set(owner), set([th_excl1.getName()] +
+                                     [th.getName() for th in th_shared]))
+
+    ev_release_excl1.set()
+    ev_release_excl2.set()
+    ev_release_shared.set()
+
+    self._waitThreads()
+
+    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
+                                              query.LQ_PENDING])),
+                     [(self.sl.name, None, None, [])])
+
   @_Repeat
   def testMixedAcquireTimeout(self):
     sync = threading.Event()
   @_Repeat
   def testMixedAcquireTimeout(self):
     sync = threading.Event()
@@ -773,16 +912,14 @@ class TestSharedLock(_ThreadedTestCase):
     prev.wait()
 
     # Check lock information
     prev.wait()
 
     # Check lock information
-    self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
-    self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
-                     ["exclusive", [threading.currentThread().getName()]])
-    self.assertEqual(self.sl.GetInfo(["name", "pending"]),
-                     [self.sl.name,
-                      [(["exclusive", "shared"][int(bool(shared))],
-                        sorted([t.getName() for t in threads]))
-                       for acquires in [perprio[i]
-                                        for i in sorted(perprio.keys())]
-                       for (shared, _, threads) in acquires]])
+    self.assertEqual(self.sl.GetLockInfo(set()),
+                     [(self.sl.name, None, None, None)])
+    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+                     [(self.sl.name, "exclusive",
+                       [threading.currentThread().getName()], None)])
+
+    self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
+                            perprio)
 
     # Let threads acquire the lock
     self.sl.release()
 
     # Let threads acquire the lock
     self.sl.release()
@@ -803,6 +940,97 @@ class TestSharedLock(_ThreadedTestCase):
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
+  def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
+    self.assertEqual(name, self.sl.name)
+    self.assert_(mode is None)
+    self.assert_(owner is None)
+
+    self.assertEqual([(pendmode, sorted(waiting))
+                      for (pendmode, waiting) in pending],
+                     [(["exclusive", "shared"][int(bool(shared))],
+                       sorted(t.getName() for t in threads))
+                      for acquires in [perprio[i]
+                                       for i in sorted(perprio.keys())]
+                      for (shared, _, threads) in acquires])
+
+  class _FakeTimeForSpuriousNotifications:
+    def __init__(self, now, check_end):
+      self.now = now
+      self.check_end = check_end
+
+      # Deterministic random number generator
+      self.rnd = random.Random(15086)
+
+    def time(self):
+      # Advance time if the random number generator thinks so (this is to test
+      # multiple notifications without advancing the time)
+      if self.rnd.random() < 0.3:
+        self.now += self.rnd.random()
+
+      self.check_end(self.now)
+
+      return self.now
+
+  @_Repeat
+  def testAcquireTimeoutWithSpuriousNotifications(self):
+    ready = threading.Event()
+    locked = threading.Event()
+    req = Queue.Queue(0)
+
+    epoch = 4000.0
+    timeout = 60.0
+
+    def check_end(now):
+      self.assertFalse(locked.isSet())
+
+      # If we waited long enough (in virtual time), tell main thread to release
+      # lock, otherwise tell it to notify once more
+      req.put(now < (epoch + (timeout * 0.8)))
+
+    time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
+
+    sl = locking.SharedLock("test", _time_fn=time_fn)
+
+    # Acquire in exclusive mode
+    sl.acquire(shared=0)
+
+    def fn():
+      self.assertTrue(sl.acquire(shared=0, timeout=timeout,
+                                 test_notify=ready.set))
+      locked.set()
+      sl.release()
+      self.done.put("success")
+
+    # Start acquire with timeout and wait for it to be ready
+    self._addThread(target=fn)
+    ready.wait()
+
+    # The separate thread is now waiting to acquire the lock, so start sending
+    # spurious notifications.
+
+    # Wait for separate thread to ask for another notification
+    count = 0
+    while req.get():
+      # After sending the notification, the lock will take a short amount of
+      # time to notice and to retrieve the current time
+      sl._notify_topmost()
+      count += 1
+
+    self.assertTrue(count > 100, "Not enough notifications were sent")
+
+    self.assertFalse(locked.isSet())
+
+    # Some notifications have been sent, now actually release the lock
+    sl.release()
+
+    # Wait for lock to be acquired
+    locked.wait()
+
+    self._waitThreads()
+
+    self.assertEqual(self.done.get_nowait(), "success")
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
 
 class TestSharedLockInCondition(_ThreadedTestCase):
   """SharedLock as a condition lock tests"""
 
 class TestSharedLockInCondition(_ThreadedTestCase):
   """SharedLock as a condition lock tests"""
@@ -817,14 +1045,14 @@ class TestSharedLockInCondition(_ThreadedTestCase):
 
   def testKeepMode(self):
     self.cond.acquire(shared=1)
 
   def testKeepMode(self):
     self.cond.acquire(shared=1)
-    self.assert_(self.sl._is_owned(shared=1))
+    self.assert_(self.sl.is_owned(shared=1))
     self.cond.wait(0)
     self.cond.wait(0)
-    self.assert_(self.sl._is_owned(shared=1))
+    self.assert_(self.sl.is_owned(shared=1))
     self.cond.release()
     self.cond.acquire(shared=0)
     self.cond.release()
     self.cond.acquire(shared=0)
-    self.assert_(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned(shared=0))
     self.cond.wait(0)
     self.cond.wait(0)
-    self.assert_(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned(shared=0))
     self.cond.release()
 
 
     self.cond.release()
 
 
@@ -843,19 +1071,19 @@ class TestSSynchronizedDecorator(_ThreadedTestCase):
 
   @locking.ssynchronized(_decoratorlock)
   def _doItExclusive(self):
 
   @locking.ssynchronized(_decoratorlock)
   def _doItExclusive(self):
-    self.assert_(_decoratorlock._is_owned())
+    self.assert_(_decoratorlock.is_owned())
     self.done.put('EXC')
 
   @locking.ssynchronized(_decoratorlock, shared=1)
   def _doItSharer(self):
     self.done.put('EXC')
 
   @locking.ssynchronized(_decoratorlock, shared=1)
   def _doItSharer(self):
-    self.assert_(_decoratorlock._is_owned(shared=1))
+    self.assert_(_decoratorlock.is_owned(shared=1))
     self.done.put('SHR')
 
   def testDecoratedFunctions(self):
     self._doItExclusive()
     self.done.put('SHR')
 
   def testDecoratedFunctions(self):
     self._doItExclusive()
-    self.assertFalse(_decoratorlock._is_owned())
+    self.assertFalse(_decoratorlock.is_owned())
     self._doItSharer()
     self._doItSharer()
-    self.assertFalse(_decoratorlock._is_owned())
+    self.assertFalse(_decoratorlock.is_owned())
 
   def testSharersCanCoexist(self):
     _decoratorlock.acquire(shared=1)
 
   def testSharersCanCoexist(self):
     _decoratorlock.acquire(shared=1)
@@ -909,27 +1137,61 @@ class TestLockSet(_ThreadedTestCase):
     newls = locking.LockSet([], "TestLockSet.testResources")
     self.assertEquals(newls._names(), set())
 
     newls = locking.LockSet([], "TestLockSet.testResources")
     self.assertEquals(newls._names(), set())
 
+  def testCheckOwnedUnknown(self):
+    self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
+    for shared in [-1, 0, 1, 6378, 24255]:
+      self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
+                                           shared=shared))
+
+  def testCheckOwnedUnknownWhileHolding(self):
+    self.assertFalse(self.ls.check_owned([]))
+    self.ls.acquire("one", shared=1)
+    self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
+    self.assertTrue(self.ls.check_owned("one", shared=1))
+    self.assertFalse(self.ls.check_owned("one", shared=0))
+    self.assertFalse(self.ls.check_owned(["one", "two"]))
+    self.assertRaises(errors.LockError, self.ls.check_owned,
+                      ["one", "nonexist"])
+    self.assertRaises(errors.LockError, self.ls.check_owned, "")
+    self.ls.release()
+    self.assertFalse(self.ls.check_owned([]))
+    self.assertFalse(self.ls.check_owned("one"))
+
   def testAcquireRelease(self):
   def testAcquireRelease(self):
+    self.assertFalse(self.ls.check_owned(self.ls._names()))
     self.assert_(self.ls.acquire('one'))
     self.assert_(self.ls.acquire('one'))
-    self.assertEquals(self.ls._list_owned(), set(['one']))
+    self.assertEquals(self.ls.list_owned(), set(['one']))
+    self.assertTrue(self.ls.check_owned("one"))
+    self.assertTrue(self.ls.check_owned("one", shared=0))
+    self.assertFalse(self.ls.check_owned("one", shared=1))
     self.ls.release()
     self.ls.release()
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
+    self.assertFalse(self.ls.check_owned(self.ls._names()))
     self.assertEquals(self.ls.acquire(['one']), set(['one']))
     self.assertEquals(self.ls.acquire(['one']), set(['one']))
-    self.assertEquals(self.ls._list_owned(), set(['one']))
+    self.assertEquals(self.ls.list_owned(), set(['one']))
     self.ls.release()
     self.ls.release()
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     self.ls.acquire(['one', 'two', 'three'])
     self.ls.acquire(['one', 'two', 'three'])
-    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
+    self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
+    self.assertTrue(self.ls.check_owned(self.ls._names()))
+    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+    self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
     self.ls.release('one')
     self.ls.release('one')
-    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
+    self.assertFalse(self.ls.check_owned(["one"]))
+    self.assertTrue(self.ls.check_owned(["two", "three"]))
+    self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
+    self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
+    self.assertEquals(self.ls.list_owned(), set(['two', 'three']))
     self.ls.release(['three'])
     self.ls.release(['three'])
-    self.assertEquals(self.ls._list_owned(), set(['two']))
+    self.assertEquals(self.ls.list_owned(), set(['two']))
     self.ls.release()
     self.ls.release()
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
-    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
+    self.assertEquals(self.ls.list_owned(), set(['one', 'three']))
     self.ls.release()
     self.ls.release()
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
+    for name in self.ls._names():
+      self.assertFalse(self.ls.check_owned(name))
 
   def testNoDoubleAcquire(self):
     self.ls.acquire('one')
 
   def testNoDoubleAcquire(self):
     self.ls.acquire('one')
@@ -949,31 +1211,31 @@ class TestLockSet(_ThreadedTestCase):
 
   def testAddRemove(self):
     self.ls.add('four')
 
   def testAddRemove(self):
     self.ls.add('four')
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     self.assert_('four' in self.ls._names())
     self.ls.add(['five', 'six', 'seven'], acquired=1)
     self.assert_('five' in self.ls._names())
     self.assert_('six' in self.ls._names())
     self.assert_('seven' in self.ls._names())
     self.assert_('four' in self.ls._names())
     self.ls.add(['five', 'six', 'seven'], acquired=1)
     self.assert_('five' in self.ls._names())
     self.assert_('six' in self.ls._names())
     self.assert_('seven' in self.ls._names())
-    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
+    self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven']))
     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
     self.assert_('five' not in self.ls._names())
     self.assert_('six' not in self.ls._names())
     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
     self.assert_('five' not in self.ls._names())
     self.assert_('six' not in self.ls._names())
-    self.assertEquals(self.ls._list_owned(), set(['seven']))
+    self.assertEquals(self.ls.list_owned(), set(['seven']))
     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
     self.ls.remove('seven')
     self.assert_('seven' not in self.ls._names())
     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
     self.ls.remove('seven')
     self.assert_('seven' not in self.ls._names())
-    self.assertEquals(self.ls._list_owned(), set([]))
+    self.assertEquals(self.ls.list_owned(), set([]))
     self.ls.acquire(None, shared=1)
     self.assertRaises(AssertionError, self.ls.add, 'eight')
     self.ls.release()
     self.ls.acquire(None)
     self.ls.add('eight', acquired=1)
     self.assert_('eight' in self.ls._names())
     self.ls.acquire(None, shared=1)
     self.assertRaises(AssertionError, self.ls.add, 'eight')
     self.ls.release()
     self.ls.acquire(None)
     self.ls.add('eight', acquired=1)
     self.assert_('eight' in self.ls._names())
-    self.assert_('eight' in self.ls._list_owned())
+    self.assert_('eight' in self.ls.list_owned())
     self.ls.add('nine')
     self.assert_('nine' in self.ls._names())
     self.ls.add('nine')
     self.assert_('nine' in self.ls._names())
-    self.assert_('nine' not in self.ls._list_owned())
+    self.assert_('nine' not in self.ls.list_owned())
     self.ls.release()
     self.ls.remove(['two'])
     self.assert_('two' not in self.ls._names())
     self.ls.release()
     self.ls.remove(['two'])
     self.assert_('two' not in self.ls._names())
@@ -1006,8 +1268,8 @@ class TestLockSet(_ThreadedTestCase):
   def testAcquireSetLock(self):
     # acquire the set-lock exclusively
     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
   def testAcquireSetLock(self):
     # acquire the set-lock exclusively
     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
-    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
-    self.assertEquals(self.ls._is_owned(), True)
+    self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
+    self.assertEquals(self.ls.is_owned(), True)
     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
     # I can still add/remove elements...
     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
     # I can still add/remove elements...
     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
@@ -1023,17 +1285,17 @@ class TestLockSet(_ThreadedTestCase):
     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
                       set(['two', 'two', 'three']))
     self.ls.release(['two', 'two'])
     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
                       set(['two', 'two', 'three']))
     self.ls.release(['two', 'two'])
-    self.assertEquals(self.ls._list_owned(), set(['three']))
+    self.assertEquals(self.ls.list_owned(), set(['three']))
 
   def testEmptyAcquire(self):
     # Acquire an empty list of locks...
     self.assertEquals(self.ls.acquire([]), set())
 
   def testEmptyAcquire(self):
     # Acquire an empty list of locks...
     self.assertEquals(self.ls.acquire([]), set())
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     # New locks can still be addded
     self.assert_(self.ls.add('six'))
     # "re-acquiring" is not an issue, since we had really acquired nothing
     self.assertEquals(self.ls.acquire([], shared=1), set())
     # New locks can still be addded
     self.assert_(self.ls.add('six'))
     # "re-acquiring" is not an issue, since we had really acquired nothing
     self.assertEquals(self.ls.acquire([], shared=1), set())
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     # We haven't really acquired anything, so we cannot release
     self.assertRaises(AssertionError, self.ls.release)
 
     # We haven't really acquired anything, so we cannot release
     self.assertRaises(AssertionError, self.ls.release)
 
@@ -1132,8 +1394,8 @@ class TestLockSet(_ThreadedTestCase):
           self.ls.release()
         else:
           self.assert_(acquired is None)
           self.ls.release()
         else:
           self.assert_(acquired is None)
-          self.assertFalse(self.ls._list_owned())
-          self.assertFalse(self.ls._is_owned())
+          self.assertFalse(self.ls.list_owned())
+          self.assertFalse(self.ls.is_owned())
           self.done.put("not acquired")
 
       self._addThread(target=_AcquireOne)
           self.done.put("not acquired")
 
       self._addThread(target=_AcquireOne)
@@ -1205,7 +1467,7 @@ class TestLockSet(_ThreadedTestCase):
 
         self.ls.release(names=name)
 
 
         self.ls.release(names=name)
 
-      self.assertFalse(self.ls._list_owned())
+      self.assertFalse(self.ls.list_owned())
 
       self._waitThreads()
 
 
       self._waitThreads()
 
@@ -1320,9 +1582,9 @@ class TestLockSet(_ThreadedTestCase):
     self.ls.add('four')
     self.ls.add('five', acquired=1)
     self.ls.add('six', acquired=1, shared=1)
     self.ls.add('four')
     self.ls.add('five', acquired=1)
     self.ls.add('six', acquired=1, shared=1)
-    self.assertEquals(self.ls._list_owned(),
+    self.assertEquals(self.ls.list_owned(),
       set(['one', 'two', 'three', 'five', 'six']))
       set(['one', 'two', 'three', 'five', 'six']))
-    self.assertEquals(self.ls._is_owned(), True)
+    self.assertEquals(self.ls.is_owned(), True)
     self.assertEquals(self.ls._names(),
       set(['one', 'two', 'three', 'four', 'five', 'six']))
     self.ls.release()
     self.assertEquals(self.ls._names(),
       set(['one', 'two', 'three', 'four', 'five', 'six']))
     self.ls.release()
@@ -1361,6 +1623,86 @@ class TestLockSet(_ThreadedTestCase):
     self.assertEqual(self.done.get_nowait(), 'DONE')
     self._setUpLS()
 
     self.assertEqual(self.done.get_nowait(), 'DONE')
     self._setUpLS()
 
+  def testAcquireWithNamesDowngrade(self):
+    self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
+    self.assertTrue(self.ls.is_owned())
+    self.assertFalse(self.ls._get_lock().is_owned())
+    self.ls.release()
+    self.assertFalse(self.ls.is_owned())
+    self.assertFalse(self.ls._get_lock().is_owned())
+    # Can't downgrade after releasing
+    self.assertRaises(AssertionError, self.ls.downgrade, "two")
+
+  def testDowngrade(self):
+    # Not owning anything, must raise an exception
+    self.assertFalse(self.ls.is_owned())
+    self.assertRaises(AssertionError, self.ls.downgrade)
+
+    self.assertFalse(compat.any(i.is_owned()
+                                for i in self.ls._get_lockdict().values()))
+    self.assertFalse(self.ls.check_owned(self.ls._names()))
+    for name in self.ls._names():
+      self.assertFalse(self.ls.check_owned(name))
+
+    self.assertEquals(self.ls.acquire(None, shared=0),
+                      set(["one", "two", "three"]))
+    self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
+
+    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+    for name in self.ls._names():
+      self.assertTrue(self.ls.check_owned(name))
+      self.assertTrue(self.ls.check_owned(name, shared=0))
+      self.assertFalse(self.ls.check_owned(name, shared=1))
+
+    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+    self.assertTrue(compat.all(i.is_owned(shared=0)
+                               for i in self.ls._get_lockdict().values()))
+
+    # Start downgrading locks
+    self.assertTrue(self.ls.downgrade(names=["one"]))
+    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+    self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
+                               for name, lock in
+                                 self.ls._get_lockdict().items()))
+
+    self.assertFalse(self.ls.check_owned("one", shared=0))
+    self.assertTrue(self.ls.check_owned("one", shared=1))
+    self.assertTrue(self.ls.check_owned("two", shared=0))
+    self.assertTrue(self.ls.check_owned("three", shared=0))
+
+    # Downgrade second lock
+    self.assertTrue(self.ls.downgrade(names="two"))
+    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+    should_share = lambda name: [0, 1][int(name in ("one", "two"))]
+    self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
+                               for name, lock in
+                                 self.ls._get_lockdict().items()))
+
+    self.assertFalse(self.ls.check_owned("one", shared=0))
+    self.assertTrue(self.ls.check_owned("one", shared=1))
+    self.assertFalse(self.ls.check_owned("two", shared=0))
+    self.assertTrue(self.ls.check_owned("two", shared=1))
+    self.assertTrue(self.ls.check_owned("three", shared=0))
+
+    # Downgrading the last exclusive lock to shared must downgrade the
+    # lockset-internal lock too
+    self.assertTrue(self.ls.downgrade(names="three"))
+    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+    self.assertTrue(compat.all(i.is_owned(shared=1)
+                               for i in self.ls._get_lockdict().values()))
+
+    # Verify owned locks
+    for name in self.ls._names():
+      self.assertTrue(self.ls.check_owned(name, shared=1))
+
+    # Downgrading a shared lock must be a no-op
+    self.assertTrue(self.ls.downgrade(names=["one", "three"]))
+    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+    self.assertTrue(compat.all(i.is_owned(shared=1)
+                               for i in self.ls._get_lockdict().values()))
+
+    self.ls.release()
+
   def testPriority(self):
     def _Acquire(prev, next, name, priority, success_fn):
       prev.wait()
   def testPriority(self):
     def _Acquire(prev, next, name, priority, success_fn):
       prev.wait()
@@ -1472,38 +1814,41 @@ class TestGanetiLockManager(_ThreadedTestCase):
 
   def testAcquireRelease(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
 
   def testAcquireRelease(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
+    self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
+                                        shared=1))
+    self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
     self.GL.release(locking.LEVEL_NODE, ['n2'])
     self.GL.release(locking.LEVEL_NODE, ['n2'])
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
     self.GL.release(locking.LEVEL_NODE)
     self.GL.release(locking.LEVEL_NODE)
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
     self.GL.release(locking.LEVEL_NODEGROUP)
     self.GL.release(locking.LEVEL_INSTANCE)
     self.assertRaises(errors.LockError, self.GL.acquire,
                       locking.LEVEL_INSTANCE, ['i5'])
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
     self.GL.release(locking.LEVEL_NODEGROUP)
     self.GL.release(locking.LEVEL_INSTANCE)
     self.assertRaises(errors.LockError, self.GL.acquire,
                       locking.LEVEL_INSTANCE, ['i5'])
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3']))
 
   def testAcquireWholeSets(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
                       set(self.instances))
 
   def testAcquireWholeSets(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
                       set(self.instances))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
                       set(self.instances))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
                       set(self.nodegroups))
                       set(self.instances))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
                       set(self.nodegroups))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
                       set(self.nodegroups))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
                       set(self.nodes))
                       set(self.nodegroups))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
                       set(self.nodes))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
                       set(self.nodes))
     self.GL.release(locking.LEVEL_NODE)
     self.GL.release(locking.LEVEL_NODEGROUP)
                       set(self.nodes))
     self.GL.release(locking.LEVEL_NODE)
     self.GL.release(locking.LEVEL_NODEGROUP)
@@ -1514,11 +1859,11 @@ class TestGanetiLockManager(_ThreadedTestCase):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
                       set(self.instances))
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
                       set(self.instances))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
                       set(self.instances))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
                       set(['n2']))
                       set(self.instances))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
                       set(['n2']))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
                       set(['n2']))
     self.GL.release(locking.LEVEL_NODE)
     self.GL.release(locking.LEVEL_INSTANCE)
                       set(['n2']))
     self.GL.release(locking.LEVEL_NODE)
     self.GL.release(locking.LEVEL_INSTANCE)
@@ -1638,9 +1983,9 @@ class TestLockMonitor(_ThreadedTestCase):
       locks.append(locking.SharedLock(name, monitor=self.lm))
 
     self.assertEqual(len(self.lm._locks), len(locks))
       locks.append(locking.SharedLock(name, monitor=self.lm))
 
     self.assertEqual(len(self.lm._locks), len(locks))
-
-    self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
-                     100)
+    result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
+    self.assertEqual(len(result.fields), 1)
+    self.assertEqual(len(result.data), 100)
 
     # Delete all locks
     del locks[:]
 
     # Delete all locks
     del locks[:]
@@ -1684,14 +2029,19 @@ class TestLockMonitor(_ThreadedTestCase):
     # Check order in which locks were added
     self.assertEqual([i.name for i in locks], expnames)
 
     # Check order in which locks were added
     self.assertEqual([i.name for i in locks], expnames)
 
-    # Sync queries are not supported
-    self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
-
     # Check query result
     # Check query result
-    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
-                                        False),
-                     [[name, None, None, []]
+    result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+    self.assert_(isinstance(result, dict))
+    response = objects.QueryResponse.FromDict(result)
+    self.assertEqual(response.data,
+                     [[(constants.RS_NORMAL, name),
+                       (constants.RS_NORMAL, None),
+                       (constants.RS_NORMAL, None),
+                       (constants.RS_NORMAL, [])]
                       for name in utils.NiceSort(expnames)])
                       for name in utils.NiceSort(expnames)])
+    self.assertEqual(len(response.fields), 4)
+    self.assertEqual(["name", "mode", "owner", "pending"],
+                     [fdef.name for fdef in response.fields])
 
     # Test exclusive acquire
     for tlock in locks[::4]:
 
     # Test exclusive acquire
     for tlock in locks[::4]:
@@ -1699,12 +2049,18 @@ class TestLockMonitor(_ThreadedTestCase):
       try:
         def _GetExpResult(name):
           if tlock.name == name:
       try:
         def _GetExpResult(name):
           if tlock.name == name:
-            return [name, "exclusive", [threading.currentThread().getName()],
-                    []]
-          return [name, None, None, []]
-
-        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
-                                             "pending"], False),
+            return [(constants.RS_NORMAL, name),
+                    (constants.RS_NORMAL, "exclusive"),
+                    (constants.RS_NORMAL,
+                     [threading.currentThread().getName()]),
+                    (constants.RS_NORMAL, [])]
+          return [(constants.RS_NORMAL, name),
+                  (constants.RS_NORMAL, None),
+                  (constants.RS_NORMAL, None),
+                  (constants.RS_NORMAL, [])]
+
+        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+        self.assertEqual(objects.QueryResponse.FromDict(result).data,
                          [_GetExpResult(name)
                           for name in utils.NiceSort(expnames)])
       finally:
                          [_GetExpResult(name)
                           for name in utils.NiceSort(expnames)])
       finally:
@@ -1756,47 +2112,64 @@ class TestLockMonitor(_ThreadedTestCase):
             i.wait()
 
           # Check query result
             i.wait()
 
           # Check query result
-          for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
-                                                         "owner"], False):
-            if name == tlock1.name:
-              self.assertEqual(mode, "shared")
-              self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
+          result = self.lm.QueryLocks(["name", "mode", "owner"])
+          response = objects.QueryResponse.FromDict(result)
+          for (name, mode, owner) in response.data:
+            (name_status, name_value) = name
+            (owner_status, owner_value) = owner
+
+            self.assertEqual(name_status, constants.RS_NORMAL)
+            self.assertEqual(owner_status, constants.RS_NORMAL)
+
+            if name_value == tlock1.name:
+              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
+              self.assertEqual(set(owner_value),
+                               set(i.getName() for i in tthreads1))
               continue
 
               continue
 
-            if name == tlock2.name:
-              self.assertEqual(mode, "shared")
-              self.assertEqual(owner, [tthread2.getName()])
+            if name_value == tlock2.name:
+              self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
+              self.assertEqual(owner_value, [tthread2.getName()])
               continue
 
               continue
 
-            if name == tlock3.name:
-              self.assertEqual(mode, "exclusive")
-              self.assertEqual(owner, [tthread3.getName()])
+            if name_value == tlock3.name:
+              self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
+              self.assertEqual(owner_value, [tthread3.getName()])
               continue
 
               continue
 
-            self.assert_(name in expnames)
-            self.assert_(mode is None)
-            self.assert_(owner is None)
+            self.assert_(name_value in expnames)
+            self.assertEqual(mode, (constants.RS_NORMAL, None))
+            self.assert_(owner_value is None)
 
           # Release locks again
           releaseev.set()
 
           self._waitThreads()
 
 
           # Release locks again
           releaseev.set()
 
           self._waitThreads()
 
-          self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
-                           [[name, None, None]
+          result = self.lm.QueryLocks(["name", "mode", "owner"])
+          self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                           [[(constants.RS_NORMAL, name),
+                             (constants.RS_NORMAL, None),
+                             (constants.RS_NORMAL, None)]
                             for name in utils.NiceSort(expnames)])
 
   def testDelete(self):
     lock = locking.SharedLock("TestLock", monitor=self.lm)
 
     self.assertEqual(len(self.lm._locks), 1)
                             for name in utils.NiceSort(expnames)])
 
   def testDelete(self):
     lock = locking.SharedLock("TestLock", monitor=self.lm)
 
     self.assertEqual(len(self.lm._locks), 1)
-    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
-                     [[lock.name, None, None]])
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                     [[(constants.RS_NORMAL, lock.name),
+                       (constants.RS_NORMAL, None),
+                       (constants.RS_NORMAL, None)]])
 
     lock.delete()
 
 
     lock.delete()
 
-    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
-                     [[lock.name, "deleted", None]])
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                     [[(constants.RS_NORMAL, lock.name),
+                       (constants.RS_NORMAL, "deleted"),
+                       (constants.RS_NORMAL, None)]])
     self.assertEqual(len(self.lm._locks), 1)
 
   def testPending(self):
     self.assertEqual(len(self.lm._locks), 1)
 
   def testPending(self):
@@ -1815,9 +2188,12 @@ class TestLockMonitor(_ThreadedTestCase):
       lock.acquire()
       try:
         self.assertEqual(len(self.lm._locks), 1)
       lock.acquire()
       try:
         self.assertEqual(len(self.lm._locks), 1)
-        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
-                         [[lock.name, "exclusive",
-                           [threading.currentThread().getName()]]])
+        result = self.lm.QueryLocks(["name", "mode", "owner"])
+        self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                         [[(constants.RS_NORMAL, lock.name),
+                           (constants.RS_NORMAL, "exclusive"),
+                           (constants.RS_NORMAL,
+                            [threading.currentThread().getName()])]])
 
         threads = []
 
 
         threads = []
 
@@ -1843,15 +2219,17 @@ class TestLockMonitor(_ThreadedTestCase):
 
         # All acquires are waiting now
         if shared:
 
         # All acquires are waiting now
         if shared:
-          pending = [("shared", sorted([t.getName() for t in threads]))]
+          pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
         else:
           pending = [("exclusive", [t.getName()]) for t in threads]
 
         else:
           pending = [("exclusive", [t.getName()]) for t in threads]
 
-        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
-                                             "pending"], False),
-                         [[lock.name, "exclusive",
-                           [threading.currentThread().getName()],
-                           pending]])
+        result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+        self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                         [[(constants.RS_NORMAL, lock.name),
+                           (constants.RS_NORMAL, "exclusive"),
+                           (constants.RS_NORMAL,
+                            [threading.currentThread().getName()]),
+                           (constants.RS_NORMAL, pending)]])
 
         self.assertEqual(len(self.lm._locks), 1)
       finally:
 
         self.assertEqual(len(self.lm._locks), 1)
       finally:
@@ -1860,12 +2238,171 @@ class TestLockMonitor(_ThreadedTestCase):
       self._waitThreads()
 
       # No pending acquires
       self._waitThreads()
 
       # No pending acquires
-      self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
-                                          False),
-                       [[lock.name, None, None, []]])
+      result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+      self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                       [[(constants.RS_NORMAL, lock.name),
+                         (constants.RS_NORMAL, None),
+                         (constants.RS_NORMAL, None),
+                         (constants.RS_NORMAL, [])]])
 
       self.assertEqual(len(self.lm._locks), 1)
 
 
       self.assertEqual(len(self.lm._locks), 1)
 
+  def testDeleteAndRecreate(self):
+    lname = "TestLock101923193"
+
+    # Create some locks with the same name and keep all references
+    locks = [locking.SharedLock(lname, monitor=self.lm)
+             for _ in range(5)]
+
+    self.assertEqual(len(self.lm._locks), len(locks))
+
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                     [[(constants.RS_NORMAL, lname),
+                       (constants.RS_NORMAL, None),
+                       (constants.RS_NORMAL, None)]] * 5)
+
+    locks[2].delete()
+
+    # Check information order
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                     [[(constants.RS_NORMAL, lname),
+                       (constants.RS_NORMAL, None),
+                       (constants.RS_NORMAL, None)]] * 2 +
+                     [[(constants.RS_NORMAL, lname),
+                       (constants.RS_NORMAL, "deleted"),
+                       (constants.RS_NORMAL, None)]] +
+                     [[(constants.RS_NORMAL, lname),
+                       (constants.RS_NORMAL, None),
+                       (constants.RS_NORMAL, None)]] * 2)
+
+    locks[1].acquire(shared=0)
+
+    last_status = [
+      [(constants.RS_NORMAL, lname),
+       (constants.RS_NORMAL, None),
+       (constants.RS_NORMAL, None)],
+      [(constants.RS_NORMAL, lname),
+       (constants.RS_NORMAL, "exclusive"),
+       (constants.RS_NORMAL, [threading.currentThread().getName()])],
+      [(constants.RS_NORMAL, lname),
+       (constants.RS_NORMAL, "deleted"),
+       (constants.RS_NORMAL, None)],
+      [(constants.RS_NORMAL, lname),
+       (constants.RS_NORMAL, None),
+       (constants.RS_NORMAL, None)],
+      [(constants.RS_NORMAL, lname),
+       (constants.RS_NORMAL, None),
+       (constants.RS_NORMAL, None)],
+      ]
+
+    # Check information order
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
+
+    self.assertEqual(len(set(self.lm._locks.values())), len(locks))
+    self.assertEqual(len(self.lm._locks), len(locks))
+
+    # Check lock deletion
+    for idx in range(len(locks)):
+      del locks[0]
+      assert gc.isenabled()
+      gc.collect()
+      self.assertEqual(len(self.lm._locks), len(locks))
+      result = self.lm.QueryLocks(["name", "mode", "owner"])
+      self.assertEqual(objects.QueryResponse.FromDict(result).data,
+                       last_status[idx + 1:])
+
+    # All locks should have been deleted
+    assert not locks
+    self.assertFalse(self.lm._locks)
+
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
+
+  class _FakeLock:
+    def __init__(self):
+      self._info = []
+
+    def AddResult(self, *args):
+      self._info.append(args)
+
+    def CountPending(self):
+      return len(self._info)
+
+    def GetLockInfo(self, requested):
+      (exp_requested, result) = self._info.pop(0)
+
+      if exp_requested != requested:
+        raise Exception("Requested information (%s) does not match"
+                        " expectations (%s)" % (requested, exp_requested))
+
+      return result
+
+  def testMultipleResults(self):
+    fl1 = self._FakeLock()
+    fl2 = self._FakeLock()
+
+    self.lm.RegisterLock(fl1)
+    self.lm.RegisterLock(fl2)
+
+    # Empty information
+    for i in [fl1, fl2]:
+      i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
+    for i in [fl1, fl2]:
+      self.assertEqual(i.CountPending(), 0)
+
+    # Check ordering
+    for fn in [lambda x: x, reversed, sorted]:
+      fl1.AddResult(set(), list(fn([
+        ("aaa", None, None, None),
+        ("bbb", None, None, None),
+        ])))
+      fl2.AddResult(set(), [])
+      result = self.lm.QueryLocks(["name"])
+      self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+        [(constants.RS_NORMAL, "aaa")],
+        [(constants.RS_NORMAL, "bbb")],
+        ])
+      for i in [fl1, fl2]:
+        self.assertEqual(i.CountPending(), 0)
+
+      for fn2 in [lambda x: x, reversed, sorted]:
+        fl1.AddResult(set([query.LQ_MODE]), list(fn([
+          # Same name, but different information
+          ("aaa", "mode0", None, None),
+          ("aaa", "mode1", None, None),
+          ("aaa", "mode2", None, None),
+          ("aaa", "mode3", None, None),
+          ])))
+        fl2.AddResult(set([query.LQ_MODE]), [
+          ("zzz", "end", None, None),
+          ("000", "start", None, None),
+          ] + list(fn2([
+          ("aaa", "b200", None, None),
+          ("aaa", "b300", None, None),
+          ])))
+        result = self.lm.QueryLocks(["name", "mode"])
+        self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+          [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
+          ] + list(fn([
+          # Name is the same, so order must be equal to incoming order
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
+          ])) + list(fn2([
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
+          ])) + [
+          [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
+          ])
+        for i in [fl1, fl2]:
+          self.assertEqual(i.CountPending(), 0)
+
 
 if __name__ == '__main__':
   testutils.GanetiTestProgram()
 
 if __name__ == '__main__':
   testutils.GanetiTestProgram()