locking: Add test for downgrade without names
[ganeti-local] / test / ganeti.locking_unittest.py
index b17e7e8..494994c 100755 (executable)
@@ -16,7 +16,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 0.0510-1301, USA.
+# 02110-1301, USA.
 
 
 """Script for unittesting the locking module"""
@@ -99,7 +99,7 @@ class _ConditionTestCase(_ThreadedTestCase):
 
   def _testAcquireRelease(self):
     self.assertFalse(self.cond._is_owned())
-    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.wait, None)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
     self.cond.acquire()
@@ -109,7 +109,7 @@ class _ConditionTestCase(_ThreadedTestCase):
     self.cond.release()
 
     self.assertFalse(self.cond._is_owned())
-    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.wait, None)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
   def _testNotification(self):
@@ -125,7 +125,7 @@ class _ConditionTestCase(_ThreadedTestCase):
     self._addThread(target=_NotifyAll)
     self.assertEqual(self.done.get(True, 1), "NE")
     self.assertRaises(Queue.Empty, self.done.get_nowait)
-    self.cond.wait()
+    self.cond.wait(None)
     self.assertEqual(self.done.get(True, 1), "NA")
     self.assertEqual(self.done.get(True, 1), "NN")
     self.assert_(self.cond._is_owned())
@@ -154,7 +154,7 @@ class TestSingleNotifyPipeCondition(_ConditionTestCase):
   def testNoNotifyReuse(self):
     self.cond.acquire()
     self.cond.notifyAll()
-    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.wait, None)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
     self.cond.release()
 
@@ -220,7 +220,7 @@ class TestPipeCondition(_ConditionTestCase):
     def _BlockingWait():
       self.cond.acquire()
       self.done.put("A")
-      self.cond.wait()
+      self.cond.wait(None)
       self.cond.release()
       self.done.put("W")
 
@@ -269,25 +269,25 @@ class TestSharedLock(_ThreadedTestCase):
     self.sl = locking.SharedLock("TestSharedLock")
 
   def testSequenceAndOwnership(self):
-    self.assertFalse(self.sl._is_owned())
+    self.assertFalse(self.sl.is_owned())
     self.sl.acquire(shared=1)
-    self.assert_(self.sl._is_owned())
-    self.assert_(self.sl._is_owned(shared=1))
-    self.assertFalse(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned())
+    self.assert_(self.sl.is_owned(shared=1))
+    self.assertFalse(self.sl.is_owned(shared=0))
     self.sl.release()
-    self.assertFalse(self.sl._is_owned())
+    self.assertFalse(self.sl.is_owned())
     self.sl.acquire()
-    self.assert_(self.sl._is_owned())
-    self.assertFalse(self.sl._is_owned(shared=1))
-    self.assert_(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned())
+    self.assertFalse(self.sl.is_owned(shared=1))
+    self.assert_(self.sl.is_owned(shared=0))
     self.sl.release()
-    self.assertFalse(self.sl._is_owned())
+    self.assertFalse(self.sl.is_owned())
     self.sl.acquire(shared=1)
-    self.assert_(self.sl._is_owned())
-    self.assert_(self.sl._is_owned(shared=1))
-    self.assertFalse(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned())
+    self.assert_(self.sl.is_owned(shared=1))
+    self.assertFalse(self.sl.is_owned(shared=0))
     self.sl.release()
-    self.assertFalse(self.sl._is_owned())
+    self.assertFalse(self.sl.is_owned())
 
   def testBooleanValue(self):
     # semaphores are supposed to return a true value on a successful acquire
@@ -433,7 +433,31 @@ class TestSharedLock(_ThreadedTestCase):
     self.assertRaises(errors.LockError, self.sl.delete)
 
   def testDeleteTimeout(self):
-    self.sl.delete(timeout=60)
+    self.assertTrue(self.sl.delete(timeout=60))
+
+  def testDeleteTimeoutFail(self):
+    ready = threading.Event()
+    finish = threading.Event()
+
+    def fn():
+      self.sl.acquire(shared=0)
+      ready.set()
+
+      finish.wait()
+      self.sl.release()
+
+    self._addThread(target=fn)
+    ready.wait()
+
+    # Test if deleting a lock owned in exclusive mode by another thread fails
+    # to delete when a timeout is used
+    self.assertFalse(self.sl.delete(timeout=0.02))
+
+    finish.set()
+    self._waitThreads()
+
+    self.assertTrue(self.sl.delete())
+    self.assertRaises(errors.LockError, self.sl.acquire)
 
   def testNoDeleteIfSharer(self):
     self.sl.acquire(shared=1)
@@ -612,6 +636,117 @@ class TestSharedLock(_ThreadedTestCase):
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
+  def testIllegalDowngrade(self):
+    # Not yet acquired
+    self.assertRaises(AssertionError, self.sl.downgrade)
+
+    # Acquire in shared mode, downgrade should be no-op
+    self.assertTrue(self.sl.acquire(shared=1))
+    self.assertTrue(self.sl.is_owned(shared=1))
+    self.assertTrue(self.sl.downgrade())
+    self.assertTrue(self.sl.is_owned(shared=1))
+    self.sl.release()
+
+  def testDowngrade(self):
+    self.assertTrue(self.sl.acquire())
+    self.assertTrue(self.sl.is_owned(shared=0))
+    self.assertTrue(self.sl.downgrade())
+    self.assertTrue(self.sl.is_owned(shared=1))
+    self.sl.release()
+
+  @_Repeat
+  def testDowngradeJumpsAheadOfExclusive(self):
+    def _KeepExclusive(ev_got, ev_downgrade, ev_release):
+      self.assertTrue(self.sl.acquire())
+      self.assertTrue(self.sl.is_owned(shared=0))
+      ev_got.set()
+      ev_downgrade.wait()
+      self.assertTrue(self.sl.is_owned(shared=0))
+      self.assertTrue(self.sl.downgrade())
+      self.assertTrue(self.sl.is_owned(shared=1))
+      ev_release.wait()
+      self.assertTrue(self.sl.is_owned(shared=1))
+      self.sl.release()
+
+    def _KeepExclusive2(ev_started, ev_release):
+      self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
+      self.assertTrue(self.sl.is_owned(shared=0))
+      ev_release.wait()
+      self.assertTrue(self.sl.is_owned(shared=0))
+      self.sl.release()
+
+    def _KeepShared(ev_started, ev_got, ev_release):
+      self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
+      self.assertTrue(self.sl.is_owned(shared=1))
+      ev_got.set()
+      ev_release.wait()
+      self.assertTrue(self.sl.is_owned(shared=1))
+      self.sl.release()
+
+    # Acquire lock in exclusive mode
+    ev_got_excl1 = threading.Event()
+    ev_downgrade_excl1 = threading.Event()
+    ev_release_excl1 = threading.Event()
+    th_excl1 = self._addThread(target=_KeepExclusive,
+                               args=(ev_got_excl1, ev_downgrade_excl1,
+                                     ev_release_excl1))
+    ev_got_excl1.wait()
+
+    # Start a second exclusive acquire
+    ev_started_excl2 = threading.Event()
+    ev_release_excl2 = threading.Event()
+    th_excl2 = self._addThread(target=_KeepExclusive2,
+                               args=(ev_started_excl2, ev_release_excl2))
+    ev_started_excl2.wait()
+
+    # Start shared acquires, will jump ahead of second exclusive acquire when
+    # first exclusive acquire downgrades
+    ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
+    ev_release_shared = threading.Event()
+
+    th_shared = [self._addThread(target=_KeepShared,
+                                 args=(ev_started, ev_got, ev_release_shared))
+                 for (ev_started, ev_got) in ev_shared]
+
+    # Wait for all shared acquires to start
+    for (ev, _) in ev_shared:
+      ev.wait()
+
+    # Check lock information
+    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+                     [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
+    [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
+    self.assertEqual([(pendmode, sorted(waiting))
+                      for (pendmode, waiting) in pending],
+                     [("exclusive", [th_excl2.getName()]),
+                      ("shared", sorted(th.getName() for th in th_shared))])
+
+    # Shared acquires won't start until the exclusive lock is downgraded
+    ev_downgrade_excl1.set()
+
+    # Wait for all shared acquires to be successful
+    for (_, ev) in ev_shared:
+      ev.wait()
+
+    # Check lock information again
+    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
+                                              query.LQ_PENDING])),
+                     [(self.sl.name, "shared", None,
+                       [("exclusive", [th_excl2.getName()])])])
+    [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
+    self.assertEqual(set(owner), set([th_excl1.getName()] +
+                                     [th.getName() for th in th_shared]))
+
+    ev_release_excl1.set()
+    ev_release_excl2.set()
+    ev_release_shared.set()
+
+    self._waitThreads()
+
+    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
+                                              query.LQ_PENDING])),
+                     [(self.sl.name, None, None, [])])
+
   @_Repeat
   def testMixedAcquireTimeout(self):
     sync = threading.Event()
@@ -777,12 +912,14 @@ class TestSharedLock(_ThreadedTestCase):
     prev.wait()
 
     # Check lock information
-    self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
-    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
-                     (self.sl.name, "exclusive",
-                      [threading.currentThread().getName()], None))
+    self.assertEqual(self.sl.GetLockInfo(set()),
+                     [(self.sl.name, None, None, None)])
+    self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+                     [(self.sl.name, "exclusive",
+                       [threading.currentThread().getName()], None)])
 
-    self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
+    self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
+                            perprio)
 
     # Let threads acquire the lock
     self.sl.release()
@@ -803,7 +940,7 @@ class TestSharedLock(_ThreadedTestCase):
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
-  def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
+  def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
     self.assertEqual(name, self.sl.name)
     self.assert_(mode is None)
     self.assert_(owner is None)
@@ -816,6 +953,84 @@ class TestSharedLock(_ThreadedTestCase):
                                        for i in sorted(perprio.keys())]
                       for (shared, _, threads) in acquires])
 
+  class _FakeTimeForSpuriousNotifications:
+    def __init__(self, now, check_end):
+      self.now = now
+      self.check_end = check_end
+
+      # Deterministic random number generator
+      self.rnd = random.Random(15086)
+
+    def time(self):
+      # Advance time if the random number generator thinks so (this is to test
+      # multiple notifications without advancing the time)
+      if self.rnd.random() < 0.3:
+        self.now += self.rnd.random()
+
+      self.check_end(self.now)
+
+      return self.now
+
+  @_Repeat
+  def testAcquireTimeoutWithSpuriousNotifications(self):
+    ready = threading.Event()
+    locked = threading.Event()
+    req = Queue.Queue(0)
+
+    epoch = 4000.0
+    timeout = 60.0
+
+    def check_end(now):
+      self.assertFalse(locked.isSet())
+
+      # If we waited long enough (in virtual time), tell main thread to release
+      # lock, otherwise tell it to notify once more
+      req.put(now < (epoch + (timeout * 0.8)))
+
+    time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
+
+    sl = locking.SharedLock("test", _time_fn=time_fn)
+
+    # Acquire in exclusive mode
+    sl.acquire(shared=0)
+
+    def fn():
+      self.assertTrue(sl.acquire(shared=0, timeout=timeout,
+                                 test_notify=ready.set))
+      locked.set()
+      sl.release()
+      self.done.put("success")
+
+    # Start acquire with timeout and wait for it to be ready
+    self._addThread(target=fn)
+    ready.wait()
+
+    # The separate thread is now waiting to acquire the lock, so start sending
+    # spurious notifications.
+
+    # Wait for separate thread to ask for another notification
+    count = 0
+    while req.get():
+      # After sending the notification, the lock will take a short amount of
+      # time to notice and to retrieve the current time
+      sl._notify_topmost()
+      count += 1
+
+    self.assertTrue(count > 100, "Not enough notifications were sent")
+
+    self.assertFalse(locked.isSet())
+
+    # Some notifications have been sent, now actually release the lock
+    sl.release()
+
+    # Wait for lock to be acquired
+    locked.wait()
+
+    self._waitThreads()
+
+    self.assertEqual(self.done.get_nowait(), "success")
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
 
 class TestSharedLockInCondition(_ThreadedTestCase):
   """SharedLock as a condition lock tests"""
@@ -830,14 +1045,14 @@ class TestSharedLockInCondition(_ThreadedTestCase):
 
   def testKeepMode(self):
     self.cond.acquire(shared=1)
-    self.assert_(self.sl._is_owned(shared=1))
+    self.assert_(self.sl.is_owned(shared=1))
     self.cond.wait(0)
-    self.assert_(self.sl._is_owned(shared=1))
+    self.assert_(self.sl.is_owned(shared=1))
     self.cond.release()
     self.cond.acquire(shared=0)
-    self.assert_(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned(shared=0))
     self.cond.wait(0)
-    self.assert_(self.sl._is_owned(shared=0))
+    self.assert_(self.sl.is_owned(shared=0))
     self.cond.release()
 
 
@@ -856,19 +1071,19 @@ class TestSSynchronizedDecorator(_ThreadedTestCase):
 
   @locking.ssynchronized(_decoratorlock)
   def _doItExclusive(self):
-    self.assert_(_decoratorlock._is_owned())
+    self.assert_(_decoratorlock.is_owned())
     self.done.put('EXC')
 
   @locking.ssynchronized(_decoratorlock, shared=1)
   def _doItSharer(self):
-    self.assert_(_decoratorlock._is_owned(shared=1))
+    self.assert_(_decoratorlock.is_owned(shared=1))
     self.done.put('SHR')
 
   def testDecoratedFunctions(self):
     self._doItExclusive()
-    self.assertFalse(_decoratorlock._is_owned())
+    self.assertFalse(_decoratorlock.is_owned())
     self._doItSharer()
-    self.assertFalse(_decoratorlock._is_owned())
+    self.assertFalse(_decoratorlock.is_owned())
 
   def testSharersCanCoexist(self):
     _decoratorlock.acquire(shared=1)
@@ -922,27 +1137,61 @@ class TestLockSet(_ThreadedTestCase):
     newls = locking.LockSet([], "TestLockSet.testResources")
     self.assertEquals(newls._names(), set())
 
+  def testCheckOwnedUnknown(self):
+    self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
+    for shared in [-1, 0, 1, 6378, 24255]:
+      self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
+                                           shared=shared))
+
+  def testCheckOwnedUnknownWhileHolding(self):
+    self.assertFalse(self.ls.check_owned([]))
+    self.ls.acquire("one", shared=1)
+    self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
+    self.assertTrue(self.ls.check_owned("one", shared=1))
+    self.assertFalse(self.ls.check_owned("one", shared=0))
+    self.assertFalse(self.ls.check_owned(["one", "two"]))
+    self.assertRaises(errors.LockError, self.ls.check_owned,
+                      ["one", "nonexist"])
+    self.assertRaises(errors.LockError, self.ls.check_owned, "")
+    self.ls.release()
+    self.assertFalse(self.ls.check_owned([]))
+    self.assertFalse(self.ls.check_owned("one"))
+
   def testAcquireRelease(self):
+    self.assertFalse(self.ls.check_owned(self.ls._names()))
     self.assert_(self.ls.acquire('one'))
-    self.assertEquals(self.ls._list_owned(), set(['one']))
+    self.assertEquals(self.ls.list_owned(), set(['one']))
+    self.assertTrue(self.ls.check_owned("one"))
+    self.assertTrue(self.ls.check_owned("one", shared=0))
+    self.assertFalse(self.ls.check_owned("one", shared=1))
     self.ls.release()
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
+    self.assertFalse(self.ls.check_owned(self.ls._names()))
     self.assertEquals(self.ls.acquire(['one']), set(['one']))
-    self.assertEquals(self.ls._list_owned(), set(['one']))
+    self.assertEquals(self.ls.list_owned(), set(['one']))
     self.ls.release()
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     self.ls.acquire(['one', 'two', 'three'])
-    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
+    self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
+    self.assertTrue(self.ls.check_owned(self.ls._names()))
+    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+    self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
     self.ls.release('one')
-    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
+    self.assertFalse(self.ls.check_owned(["one"]))
+    self.assertTrue(self.ls.check_owned(["two", "three"]))
+    self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
+    self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
+    self.assertEquals(self.ls.list_owned(), set(['two', 'three']))
     self.ls.release(['three'])
-    self.assertEquals(self.ls._list_owned(), set(['two']))
+    self.assertEquals(self.ls.list_owned(), set(['two']))
     self.ls.release()
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
-    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
+    self.assertEquals(self.ls.list_owned(), set(['one', 'three']))
     self.ls.release()
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
+    for name in self.ls._names():
+      self.assertFalse(self.ls.check_owned(name))
 
   def testNoDoubleAcquire(self):
     self.ls.acquire('one')
@@ -962,31 +1211,31 @@ class TestLockSet(_ThreadedTestCase):
 
   def testAddRemove(self):
     self.ls.add('four')
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     self.assert_('four' in self.ls._names())
     self.ls.add(['five', 'six', 'seven'], acquired=1)
     self.assert_('five' in self.ls._names())
     self.assert_('six' in self.ls._names())
     self.assert_('seven' in self.ls._names())
-    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
+    self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven']))
     self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
     self.assert_('five' not in self.ls._names())
     self.assert_('six' not in self.ls._names())
-    self.assertEquals(self.ls._list_owned(), set(['seven']))
+    self.assertEquals(self.ls.list_owned(), set(['seven']))
     self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
     self.ls.remove('seven')
     self.assert_('seven' not in self.ls._names())
-    self.assertEquals(self.ls._list_owned(), set([]))
+    self.assertEquals(self.ls.list_owned(), set([]))
     self.ls.acquire(None, shared=1)
     self.assertRaises(AssertionError, self.ls.add, 'eight')
     self.ls.release()
     self.ls.acquire(None)
     self.ls.add('eight', acquired=1)
     self.assert_('eight' in self.ls._names())
-    self.assert_('eight' in self.ls._list_owned())
+    self.assert_('eight' in self.ls.list_owned())
     self.ls.add('nine')
     self.assert_('nine' in self.ls._names())
-    self.assert_('nine' not in self.ls._list_owned())
+    self.assert_('nine' not in self.ls.list_owned())
     self.ls.release()
     self.ls.remove(['two'])
     self.assert_('two' not in self.ls._names())
@@ -1019,8 +1268,8 @@ class TestLockSet(_ThreadedTestCase):
   def testAcquireSetLock(self):
     # acquire the set-lock exclusively
     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
-    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
-    self.assertEquals(self.ls._is_owned(), True)
+    self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
+    self.assertEquals(self.ls.is_owned(), True)
     self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
     # I can still add/remove elements...
     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
@@ -1036,17 +1285,17 @@ class TestLockSet(_ThreadedTestCase):
     self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
                       set(['two', 'two', 'three']))
     self.ls.release(['two', 'two'])
-    self.assertEquals(self.ls._list_owned(), set(['three']))
+    self.assertEquals(self.ls.list_owned(), set(['three']))
 
   def testEmptyAcquire(self):
     # Acquire an empty list of locks...
     self.assertEquals(self.ls.acquire([]), set())
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     # New locks can still be addded
     self.assert_(self.ls.add('six'))
     # "re-acquiring" is not an issue, since we had really acquired nothing
     self.assertEquals(self.ls.acquire([], shared=1), set())
-    self.assertEquals(self.ls._list_owned(), set())
+    self.assertEquals(self.ls.list_owned(), set())
     # We haven't really acquired anything, so we cannot release
     self.assertRaises(AssertionError, self.ls.release)
 
@@ -1145,8 +1394,8 @@ class TestLockSet(_ThreadedTestCase):
           self.ls.release()
         else:
           self.assert_(acquired is None)
-          self.assertFalse(self.ls._list_owned())
-          self.assertFalse(self.ls._is_owned())
+          self.assertFalse(self.ls.list_owned())
+          self.assertFalse(self.ls.is_owned())
           self.done.put("not acquired")
 
       self._addThread(target=_AcquireOne)
@@ -1218,7 +1467,7 @@ class TestLockSet(_ThreadedTestCase):
 
         self.ls.release(names=name)
 
-      self.assertFalse(self.ls._list_owned())
+      self.assertFalse(self.ls.list_owned())
 
       self._waitThreads()
 
@@ -1333,9 +1582,9 @@ class TestLockSet(_ThreadedTestCase):
     self.ls.add('four')
     self.ls.add('five', acquired=1)
     self.ls.add('six', acquired=1, shared=1)
-    self.assertEquals(self.ls._list_owned(),
+    self.assertEquals(self.ls.list_owned(),
       set(['one', 'two', 'three', 'five', 'six']))
-    self.assertEquals(self.ls._is_owned(), True)
+    self.assertEquals(self.ls.is_owned(), True)
     self.assertEquals(self.ls._names(),
       set(['one', 'two', 'three', 'four', 'five', 'six']))
     self.ls.release()
@@ -1374,6 +1623,101 @@ class TestLockSet(_ThreadedTestCase):
     self.assertEqual(self.done.get_nowait(), 'DONE')
     self._setUpLS()
 
+  def testAcquireWithNamesDowngrade(self):
+    self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
+    self.assertTrue(self.ls.is_owned())
+    self.assertFalse(self.ls._get_lock().is_owned())
+    self.ls.release()
+    self.assertFalse(self.ls.is_owned())
+    self.assertFalse(self.ls._get_lock().is_owned())
+    # Can't downgrade after releasing
+    self.assertRaises(AssertionError, self.ls.downgrade, "two")
+
+  def testDowngrade(self):
+    # Not owning anything, must raise an exception
+    self.assertFalse(self.ls.is_owned())
+    self.assertRaises(AssertionError, self.ls.downgrade)
+
+    self.assertFalse(compat.any(i.is_owned()
+                                for i in self.ls._get_lockdict().values()))
+    self.assertFalse(self.ls.check_owned(self.ls._names()))
+    for name in self.ls._names():
+      self.assertFalse(self.ls.check_owned(name))
+
+    self.assertEquals(self.ls.acquire(None, shared=0),
+                      set(["one", "two", "three"]))
+    self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
+
+    self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+    for name in self.ls._names():
+      self.assertTrue(self.ls.check_owned(name))
+      self.assertTrue(self.ls.check_owned(name, shared=0))
+      self.assertFalse(self.ls.check_owned(name, shared=1))
+
+    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+    self.assertTrue(compat.all(i.is_owned(shared=0)
+                               for i in self.ls._get_lockdict().values()))
+
+    # Start downgrading locks
+    self.assertTrue(self.ls.downgrade(names=["one"]))
+    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+    self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
+                               for name, lock in
+                                 self.ls._get_lockdict().items()))
+
+    self.assertFalse(self.ls.check_owned("one", shared=0))
+    self.assertTrue(self.ls.check_owned("one", shared=1))
+    self.assertTrue(self.ls.check_owned("two", shared=0))
+    self.assertTrue(self.ls.check_owned("three", shared=0))
+
+    # Downgrade second lock
+    self.assertTrue(self.ls.downgrade(names="two"))
+    self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+    should_share = lambda name: [0, 1][int(name in ("one", "two"))]
+    self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
+                               for name, lock in
+                                 self.ls._get_lockdict().items()))
+
+    self.assertFalse(self.ls.check_owned("one", shared=0))
+    self.assertTrue(self.ls.check_owned("one", shared=1))
+    self.assertFalse(self.ls.check_owned("two", shared=0))
+    self.assertTrue(self.ls.check_owned("two", shared=1))
+    self.assertTrue(self.ls.check_owned("three", shared=0))
+
+    # Downgrading the last exclusive lock to shared must downgrade the
+    # lockset-internal lock too
+    self.assertTrue(self.ls.downgrade(names="three"))
+    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+    self.assertTrue(compat.all(i.is_owned(shared=1)
+                               for i in self.ls._get_lockdict().values()))
+
+    # Verify owned locks
+    for name in self.ls._names():
+      self.assertTrue(self.ls.check_owned(name, shared=1))
+
+    # Downgrading a shared lock must be a no-op
+    self.assertTrue(self.ls.downgrade(names=["one", "three"]))
+    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+    self.assertTrue(compat.all(i.is_owned(shared=1)
+                               for i in self.ls._get_lockdict().values()))
+
+    self.ls.release()
+
+  def testDowngradeEverything(self):
+    self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0),
+                     set(["one", "two", "three"]))
+
+    # Ensure all locks are now owned in exclusive mode
+    for name in self.ls._names():
+      self.assertTrue(self.ls.check_owned(name, shared=0))
+
+    # Downgrade everything
+    self.assertTrue(self.ls.downgrade())
+
+    # Ensure all locks are now owned in shared mode
+    for name in self.ls._names():
+      self.assertTrue(self.ls.check_owned(name, shared=1))
+
   def testPriority(self):
     def _Acquire(prev, next, name, priority, success_fn):
       prev.wait()
@@ -1433,8 +1777,9 @@ class TestGanetiLockManager(_ThreadedTestCase):
     self.nodes=['n1', 'n2']
     self.nodegroups=['g1', 'g2']
     self.instances=['i1', 'i2', 'i3']
+    self.networks=['net1', 'net2', 'net3']
     self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
-                                        self.instances)
+                                        self.instances, self.networks)
 
   def tearDown(self):
     # Don't try this at home...
@@ -1449,7 +1794,7 @@ class TestGanetiLockManager(_ThreadedTestCase):
       self.assertEqual(i, locking.LEVELS[i])
 
   def testDoubleGLFails(self):
-    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
+    self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
 
   def testLockNames(self):
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
@@ -1458,65 +1803,81 @@ class TestGanetiLockManager(_ThreadedTestCase):
                      set(self.nodegroups))
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
                      set(self.instances))
+    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
+                     set(self.networks))
 
   def testInitAndResources(self):
     locking.GanetiLockManager._instance = None
-    self.GL = locking.GanetiLockManager([], [], [])
+    self.GL = locking.GanetiLockManager([], [], [], [])
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
 
     locking.GanetiLockManager._instance = None
-    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
+    self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
                                     set(self.nodegroups))
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
 
     locking.GanetiLockManager._instance = None
-    self.GL = locking.GanetiLockManager([], [], self.instances)
+    self.GL = locking.GanetiLockManager([], [], self.instances, [])
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
     self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
                      set(self.instances))
 
+    locking.GanetiLockManager._instance = None
+    self.GL = locking.GanetiLockManager([], [], [], self.networks)
+    self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
+    self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+    self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
+    self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+    self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
+                     set(self.networks))
+
   def testAcquireRelease(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
     self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
+    self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
+                                        shared=1))
+    self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
     self.GL.release(locking.LEVEL_NODE, ['n2'])
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
     self.GL.release(locking.LEVEL_NODE)
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
     self.GL.release(locking.LEVEL_NODEGROUP)
     self.GL.release(locking.LEVEL_INSTANCE)
     self.assertRaises(errors.LockError, self.GL.acquire,
                       locking.LEVEL_INSTANCE, ['i5'])
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3']))
 
   def testAcquireWholeSets(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
                       set(self.instances))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
                       set(self.instances))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
                       set(self.nodegroups))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
                       set(self.nodegroups))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
                       set(self.nodes))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
                       set(self.nodes))
     self.GL.release(locking.LEVEL_NODE)
     self.GL.release(locking.LEVEL_NODEGROUP)
@@ -1527,11 +1888,11 @@ class TestGanetiLockManager(_ThreadedTestCase):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
     self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
                       set(self.instances))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
                       set(self.instances))
     self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
                       set(['n2']))
-    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+    self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
                       set(['n2']))
     self.GL.release(locking.LEVEL_NODE)
     self.GL.release(locking.LEVEL_INSTANCE)
@@ -1989,6 +2350,88 @@ class TestLockMonitor(_ThreadedTestCase):
     result = self.lm.QueryLocks(["name", "mode", "owner"])
     self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
 
+  class _FakeLock:
+    def __init__(self):
+      self._info = []
+
+    def AddResult(self, *args):
+      self._info.append(args)
+
+    def CountPending(self):
+      return len(self._info)
+
+    def GetLockInfo(self, requested):
+      (exp_requested, result) = self._info.pop(0)
+
+      if exp_requested != requested:
+        raise Exception("Requested information (%s) does not match"
+                        " expectations (%s)" % (requested, exp_requested))
 
-if __name__ == '__main__':
+      return result
+
+  def testMultipleResults(self):
+    fl1 = self._FakeLock()
+    fl2 = self._FakeLock()
+
+    self.lm.RegisterLock(fl1)
+    self.lm.RegisterLock(fl2)
+
+    # Empty information
+    for i in [fl1, fl2]:
+      i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
+    result = self.lm.QueryLocks(["name", "mode", "owner"])
+    self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
+    for i in [fl1, fl2]:
+      self.assertEqual(i.CountPending(), 0)
+
+    # Check ordering
+    for fn in [lambda x: x, reversed, sorted]:
+      fl1.AddResult(set(), list(fn([
+        ("aaa", None, None, None),
+        ("bbb", None, None, None),
+        ])))
+      fl2.AddResult(set(), [])
+      result = self.lm.QueryLocks(["name"])
+      self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+        [(constants.RS_NORMAL, "aaa")],
+        [(constants.RS_NORMAL, "bbb")],
+        ])
+      for i in [fl1, fl2]:
+        self.assertEqual(i.CountPending(), 0)
+
+      for fn2 in [lambda x: x, reversed, sorted]:
+        fl1.AddResult(set([query.LQ_MODE]), list(fn([
+          # Same name, but different information
+          ("aaa", "mode0", None, None),
+          ("aaa", "mode1", None, None),
+          ("aaa", "mode2", None, None),
+          ("aaa", "mode3", None, None),
+          ])))
+        fl2.AddResult(set([query.LQ_MODE]), [
+          ("zzz", "end", None, None),
+          ("000", "start", None, None),
+          ] + list(fn2([
+          ("aaa", "b200", None, None),
+          ("aaa", "b300", None, None),
+          ])))
+        result = self.lm.QueryLocks(["name", "mode"])
+        self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+          [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
+          ] + list(fn([
+          # Name is the same, so order must be equal to incoming order
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
+          ])) + list(fn2([
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
+          [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
+          ])) + [
+          [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
+          ])
+        for i in [fl1, fl2]:
+          self.assertEqual(i.CountPending(), 0)
+
+
+if __name__ == "__main__":
   testutils.GanetiTestProgram()