Modify gnt-node add to call external script
[ganeti-local] / test / ganeti.locking_unittest.py
index 2300517..3eb2375 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -31,10 +31,12 @@ import threading
 from ganeti import locking
 from ganeti import errors
 
+import testutils
+
 
 # This is used to test the ssynchronize decorator.
 # Since it's passed as input to a decorator it must be declared as a global.
-_decoratorlock = locking.SharedLock()
+_decoratorlock = locking.SharedLock("decorator lock")
 
 #: List for looping tests
 ITERATIONS = range(8)
@@ -48,6 +50,15 @@ def _Repeat(fn):
   return wrapper
 
 
+def SafeSleep(duration):
+  start = time.time()
+  while True:
+    delay = start + duration - time.time()
+    if delay <= 0.0:
+      break
+    time.sleep(delay)
+
+
 class _ThreadedTestCase(unittest.TestCase):
   """Test class that supports adding/waiting on threads"""
   def setUp(self):
@@ -79,7 +90,7 @@ class _ConditionTestCase(_ThreadedTestCase):
     self.cond = cls(self.lock)
 
   def _testAcquireRelease(self):
-    self.assert_(not self.cond._is_owned())
+    self.assertFalse(self.cond._is_owned())
     self.assertRaises(RuntimeError, self.cond.wait)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
@@ -89,7 +100,7 @@ class _ConditionTestCase(_ThreadedTestCase):
     self.assert_(self.cond._is_owned())
     self.cond.release()
 
-    self.assert_(not self.cond._is_owned())
+    self.assertFalse(self.cond._is_owned())
     self.assertRaises(RuntimeError, self.cond.wait)
     self.assertRaises(RuntimeError, self.cond.notifyAll)
 
@@ -111,14 +122,40 @@ class _ConditionTestCase(_ThreadedTestCase):
     self.assertEqual(self.done.get(True, 1), "NN")
     self.assert_(self.cond._is_owned())
     self.cond.release()
-    self.assert_(not self.cond._is_owned())
+    self.assertFalse(self.cond._is_owned())
+
+
+class TestSingleNotifyPipeCondition(_ConditionTestCase):
+  """SingleNotifyPipeCondition tests"""
+
+  def setUp(self):
+    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
+
+  def testAcquireRelease(self):
+    self._testAcquireRelease()
+
+  def testNotification(self):
+    self._testNotification()
+
+  def testWaitReuse(self):
+    self.cond.acquire()
+    self.cond.wait(0)
+    self.cond.wait(0.1)
+    self.cond.release()
+
+  def testNoNotifyReuse(self):
+    self.cond.acquire()
+    self.cond.notifyAll()
+    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.notifyAll)
+    self.cond.release()
 
 
 class TestPipeCondition(_ConditionTestCase):
-  """_PipeCondition tests"""
+  """PipeCondition tests"""
 
   def setUp(self):
-    _ConditionTestCase.setUp(self, locking._PipeCondition)
+    _ConditionTestCase.setUp(self, locking.PipeCondition)
 
   def testAcquireRelease(self):
     self._testAcquireRelease()
@@ -214,113 +251,33 @@ class TestPipeCondition(_ConditionTestCase):
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
 
-class TestSingleActionPipeCondition(unittest.TestCase):
-  """_SingleActionPipeCondition tests"""
-
-  def setUp(self):
-    self.cond = locking._SingleActionPipeCondition()
-
-  def testInitialization(self):
-    self.assert_(self.cond._read_fd is not None)
-    self.assert_(self.cond._write_fd is not None)
-    self.assert_(self.cond._poller is not None)
-    self.assertEqual(self.cond._nwaiters, 0)
-
-  def testUsageCount(self):
-    self.cond.StartWaiting()
-    self.assert_(self.cond._read_fd is not None)
-    self.assert_(self.cond._write_fd is not None)
-    self.assert_(self.cond._poller is not None)
-    self.assertEqual(self.cond._nwaiters, 1)
-
-    # use again
-    self.cond.StartWaiting()
-    self.assertEqual(self.cond._nwaiters, 2)
-
-    # there is more than one user
-    self.assert_(not self.cond.DoneWaiting())
-    self.assert_(self.cond._read_fd is not None)
-    self.assert_(self.cond._write_fd is not None)
-    self.assert_(self.cond._poller is not None)
-    self.assertEqual(self.cond._nwaiters, 1)
-
-    self.assert_(self.cond.DoneWaiting())
-    self.assertEqual(self.cond._nwaiters, 0)
-    self.assert_(self.cond._read_fd is None)
-    self.assert_(self.cond._write_fd is None)
-    self.assert_(self.cond._poller is None)
-
-  def testNotify(self):
-    wait1 = self.cond.StartWaiting()
-    wait2 = self.cond.StartWaiting()
-
-    self.assert_(self.cond._read_fd is not None)
-    self.assert_(self.cond._write_fd is not None)
-    self.assert_(self.cond._poller is not None)
-
-    self.cond.notifyAll()
-
-    self.assert_(self.cond._read_fd is not None)
-    self.assert_(self.cond._write_fd is None)
-    self.assert_(self.cond._poller is not None)
-
-    self.assert_(not self.cond.DoneWaiting())
-
-    self.assert_(self.cond._read_fd is not None)
-    self.assert_(self.cond._write_fd is None)
-    self.assert_(self.cond._poller is not None)
-
-    self.assert_(self.cond.DoneWaiting())
-
-    self.assert_(self.cond._read_fd is None)
-    self.assert_(self.cond._write_fd is None)
-    self.assert_(self.cond._poller is None)
-
-  def testReusage(self):
-    self.cond.StartWaiting()
-    self.assert_(self.cond._read_fd is not None)
-    self.assert_(self.cond._write_fd is not None)
-    self.assert_(self.cond._poller is not None)
-
-    self.assert_(self.cond.DoneWaiting())
-
-    self.assertRaises(RuntimeError, self.cond.StartWaiting)
-    self.assert_(self.cond._read_fd is None)
-    self.assert_(self.cond._write_fd is None)
-    self.assert_(self.cond._poller is None)
-
-  def testNotifyTwice(self):
-    self.cond.notifyAll()
-    self.assertRaises(RuntimeError, self.cond.notifyAll)
-
-
 class TestSharedLock(_ThreadedTestCase):
   """SharedLock tests"""
 
   def setUp(self):
     _ThreadedTestCase.setUp(self)
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock("TestSharedLock")
 
   def testSequenceAndOwnership(self):
-    self.assert_(not self.sl._is_owned())
+    self.assertFalse(self.sl._is_owned())
     self.sl.acquire(shared=1)
     self.assert_(self.sl._is_owned())
     self.assert_(self.sl._is_owned(shared=1))
-    self.assert_(not self.sl._is_owned(shared=0))
+    self.assertFalse(self.sl._is_owned(shared=0))
     self.sl.release()
-    self.assert_(not self.sl._is_owned())
+    self.assertFalse(self.sl._is_owned())
     self.sl.acquire()
     self.assert_(self.sl._is_owned())
-    self.assert_(not self.sl._is_owned(shared=1))
+    self.assertFalse(self.sl._is_owned(shared=1))
     self.assert_(self.sl._is_owned(shared=0))
     self.sl.release()
-    self.assert_(not self.sl._is_owned())
+    self.assertFalse(self.sl._is_owned())
     self.sl.acquire(shared=1)
     self.assert_(self.sl._is_owned())
     self.assert_(self.sl._is_owned(shared=1))
-    self.assert_(not self.sl._is_owned(shared=0))
+    self.assertFalse(self.sl._is_owned(shared=0))
     self.sl.release()
-    self.assert_(not self.sl._is_owned())
+    self.assertFalse(self.sl._is_owned())
 
   def testBooleanValue(self):
     # semaphores are supposed to return a true value on a successful acquire
@@ -393,7 +350,7 @@ class TestSharedLock(_ThreadedTestCase):
     self.sl.release()
     self._waitThreads()
     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock(self.sl.name)
 
   @_Repeat
   def testExclusiveBlocksSharer(self):
@@ -421,7 +378,7 @@ class TestSharedLock(_ThreadedTestCase):
     self.sl.release()
     self._waitThreads()
     self.failUnlessEqual(self.done.get_nowait(), 'DEL')
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock(self.sl.name)
 
   @_Repeat
   def testWaitingExclusiveBlocksSharer(self):
@@ -484,7 +441,7 @@ class TestSharedLock(_ThreadedTestCase):
     # The threads who were pending return ERR
     for _ in range(4):
       self.assertEqual(self.done.get_nowait(), 'ERR')
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock(self.sl.name)
 
   @_Repeat
   def testDeletePendingDeleteExclusiveSharers(self):
@@ -500,32 +457,38 @@ class TestSharedLock(_ThreadedTestCase):
     self.assertEqual(self.done.get_nowait(), 'ERR')
     self.assertEqual(self.done.get_nowait(), 'ERR')
     self.assertEqual(self.done.get_nowait(), 'ERR')
-    self.sl = locking.SharedLock()
+    self.sl = locking.SharedLock(self.sl.name)
 
   @_Repeat
   def testExclusiveAcquireTimeout(self):
-    def _LockExclusive(wait):
-      self.sl.acquire(shared=0)
-      self.done.put("A: start sleep")
-      time.sleep(wait)
-      self.done.put("A: end sleep")
-      self.sl.release()
-
     for shared in [0, 1]:
-      # Start thread to hold lock for 20 ms
-      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
+      on_queue = threading.Event()
+      release_exclusive = threading.Event()
+
+      def _LockExclusive():
+        self.sl.acquire(shared=0, test_notify=on_queue.set)
+        self.done.put("A: start wait")
+        release_exclusive.wait()
+        self.done.put("A: end wait")
+        self.sl.release()
+
+      # Start thread to hold lock in exclusive mode
+      self._addThread(target=_LockExclusive)
 
-      # Wait for sleep to begin
-      self.assertEqual(self.done.get(), "A: start sleep")
+      # Wait for wait to begin
+      self.assertEqual(self.done.get(timeout=60), "A: start wait")
+
+      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
+      # on the queue
+      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
+                                      test_notify=release_exclusive.set))
 
-      # Wait up to 100 ms to get lock
-      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
       self.done.put("got 2nd")
       self.sl.release()
 
       self._waitThreads()
 
-      self.assertEqual(self.done.get_nowait(), "A: end sleep")
+      self.assertEqual(self.done.get_nowait(), "A: end wait")
       self.assertEqual(self.done.get_nowait(), "got 2nd")
       self.assertRaises(Queue.Empty, self.done.get_nowait)
 
@@ -540,7 +503,7 @@ class TestSharedLock(_ThreadedTestCase):
       self.sl.acquire()
 
       # Start shared acquires with timeout between 0 and 20 ms
-      for i in xrange(11):
+      for i in range(11):
         self._addThread(target=_AcquireWithTimeout,
                         args=(shared, i * 2.0 / 1000.0))
 
@@ -551,7 +514,7 @@ class TestSharedLock(_ThreadedTestCase):
       # Release lock
       self.sl.release()
 
-      for _ in xrange(11):
+      for _ in range(11):
         self.assertEqual(self.done.get_nowait(), "timeout")
 
       self.assertRaises(Queue.Empty, self.done.get_nowait)
@@ -561,44 +524,52 @@ class TestSharedLock(_ThreadedTestCase):
     # Tests whether shared acquires jump in front of exclusive acquires in the
     # queue.
 
-    # Get exclusive lock while we fill the queue
-    self.sl.acquire()
+    def _Acquire(shared, name, notify_ev, wait_ev):
+      if notify_ev:
+        notify_fn = notify_ev.set
+      else:
+        notify_fn = None
+
+      if wait_ev:
+        wait_ev.wait()
 
-    def _Acquire(shared, name):
-      if not self.sl.acquire(shared=shared):
+      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
         return
 
       self.done.put(name)
       self.sl.release()
 
-    # Start shared acquires
-    for _ in xrange(5):
-      self._addThread(target=_Acquire, args=(1, "shared A"))
+    # Get exclusive lock while we fill the queue
+    self.sl.acquire()
+
+    shrcnt1 = 5
+    shrcnt2 = 7
+    shrcnt3 = 9
+    shrcnt4 = 2
+
+    # Add acquires using threading.Event for synchronization. They'll be
+    # acquired exactly in the order defined in this list.
+    acquires = (shrcnt1 * [(1, "shared 1")] +
+                3 * [(0, "exclusive 1")] +
+                shrcnt2 * [(1, "shared 2")] +
+                shrcnt3 * [(1, "shared 3")] +
+                shrcnt4 * [(1, "shared 4")] +
+                3 * [(0, "exclusive 2")])
 
-    # Start exclusive acquires
-    for _ in xrange(3):
-      self._addThread(target=_Acquire, args=(0, "exclusive B"))
+    ev_cur = None
+    ev_prev = None
 
-    # More shared acquires
-    for _ in xrange(5):
-      self._addThread(target=_Acquire, args=(1, "shared C"))
+    for args in acquires:
+      ev_cur = threading.Event()
+      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
+      ev_prev = ev_cur
 
-    # More exclusive acquires
-    for _ in xrange(3):
-      self._addThread(target=_Acquire, args=(0, "exclusive D"))
+    # Wait for last acquire to start
+    ev_prev.wait()
 
     # Expect 6 pending exclusive acquires and 1 for all shared acquires
-    # together. There's no way to wait for SharedLock.acquire to start
-    # its work. Hence the timeout of 2 seconds.
-    pending = 0
-    end_time = time.time() + 2.0
-    while time.time() < end_time:
-      pending = self.sl._count_pending()
-      self.assert_(pending >= 0 and pending <= 7)
-      if pending == 7:
-        break
-      time.sleep(0.05)
-    self.assertEqual(pending, 7)
+    # together
+    self.assertEqual(self.sl._count_pending(), 7)
 
     # Release exclusive lock and wait
     self.sl.release()
@@ -606,30 +577,34 @@ class TestSharedLock(_ThreadedTestCase):
     self._waitThreads()
 
     # Check sequence
-    shr_a = 0
-    shr_c = 0
-    for _ in xrange(10):
+    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
       # Shared locks aren't guaranteed to be notified in order, but they'll be
       # first
       tmp = self.done.get_nowait()
-      if tmp == "shared A":
-        shr_a += 1
-      elif tmp == "shared C":
-        shr_c += 1
-    self.assertEqual(shr_a, 5)
-    self.assertEqual(shr_c, 5)
+      if tmp == "shared 1":
+        shrcnt1 -= 1
+      elif tmp == "shared 2":
+        shrcnt2 -= 1
+      elif tmp == "shared 3":
+        shrcnt3 -= 1
+      elif tmp == "shared 4":
+        shrcnt4 -= 1
+    self.assertEqual(shrcnt1, 0)
+    self.assertEqual(shrcnt2, 0)
+    self.assertEqual(shrcnt3, 0)
+    self.assertEqual(shrcnt3, 0)
 
-    for _ in xrange(3):
-      self.assertEqual(self.done.get_nowait(), "exclusive B")
+    for _ in range(3):
+      self.assertEqual(self.done.get_nowait(), "exclusive 1")
 
-    for _ in xrange(3):
-      self.assertEqual(self.done.get_nowait(), "exclusive D")
+    for _ in range(3):
+      self.assertEqual(self.done.get_nowait(), "exclusive 2")
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
   @_Repeat
   def testMixedAcquireTimeout(self):
-    sync = threading.Condition()
+    sync = threading.Event()
 
     def _AcquireShared(ev):
       if not self.sl.acquire(shared=1, timeout=None):
@@ -640,18 +615,14 @@ class TestSharedLock(_ThreadedTestCase):
       # Notify main thread
       ev.set()
 
-      # Wait for notification
-      sync.acquire()
-      try:
-        sync.wait()
-      finally:
-        sync.release()
+      # Wait for notification from main thread
+      sync.wait()
 
       # Release lock
       self.sl.release()
 
     acquires = []
-    for _ in xrange(3):
+    for _ in range(3):
       ev = threading.Event()
       self._addThread(target=_AcquireShared, args=(ev, ))
       acquires.append(ev)
@@ -666,7 +637,7 @@ class TestSharedLock(_ThreadedTestCase):
     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
 
     # Acquire exclusive without timeout
-    exclsync = threading.Condition()
+    exclsync = threading.Event()
     exclev = threading.Event()
 
     def _AcquireExclusive():
@@ -678,11 +649,8 @@ class TestSharedLock(_ThreadedTestCase):
       # Notify main thread
       exclev.set()
 
-      exclsync.acquire()
-      try:
-        exclsync.wait()
-      finally:
-        exclsync.release()
+      # Wait for notification from main thread
+      exclsync.wait()
 
       self.sl.release()
 
@@ -692,11 +660,7 @@ class TestSharedLock(_ThreadedTestCase):
     self.failIf(self.sl.acquire(shared=0, timeout=0.02))
 
     # Make all shared holders release their locks
-    sync.acquire()
-    try:
-      sync.notifyAll()
-    finally:
-      sync.release()
+    sync.set()
 
     # Wait for exclusive acquire to succeed
     exclev.wait()
@@ -711,15 +675,11 @@ class TestSharedLock(_ThreadedTestCase):
         self.done.put("shared2")
         self.sl.release()
 
-    for _ in xrange(10):
+    for _ in range(10):
       self._addThread(target=_AcquireSharedSimple)
 
     # Tell exclusive lock to release
-    exclsync.acquire()
-    try:
-      exclsync.notifyAll()
-    finally:
-      exclsync.release()
+    exclsync.set()
 
     # Wait for everything to finish
     self._waitThreads()
@@ -727,17 +687,48 @@ class TestSharedLock(_ThreadedTestCase):
     self.assertEqual(self.sl._count_pending(), 0)
 
     # Check sequence
-    for _ in xrange(3):
+    for _ in range(3):
       self.assertEqual(self.done.get_nowait(), "shared")
 
     self.assertEqual(self.done.get_nowait(), "exclusive")
 
-    for _ in xrange(10):
+    for _ in range(10):
       self.assertEqual(self.done.get_nowait(), "shared2")
 
     self.assertRaises(Queue.Empty, self.done.get_nowait)
 
 
+class TestSharedLockInCondition(_ThreadedTestCase):
+  """SharedLock as a condition lock tests"""
+
+  def setUp(self):
+    _ThreadedTestCase.setUp(self)
+    self.sl = locking.SharedLock("TestSharedLockInCondition")
+    self.setCondition()
+
+  def setCondition(self):
+    self.cond = threading.Condition(self.sl)
+
+  def testKeepMode(self):
+    self.cond.acquire(shared=1)
+    self.assert_(self.sl._is_owned(shared=1))
+    self.cond.wait(0)
+    self.assert_(self.sl._is_owned(shared=1))
+    self.cond.release()
+    self.cond.acquire(shared=0)
+    self.assert_(self.sl._is_owned(shared=0))
+    self.cond.wait(0)
+    self.assert_(self.sl._is_owned(shared=0))
+    self.cond.release()
+
+
+class TestSharedLockInPipeCondition(TestSharedLockInCondition):
+  """SharedLock as a pipe condition lock tests"""
+
+  def setCondition(self):
+    self.cond = locking.PipeCondition(self.sl)
+
+
 class TestSSynchronizedDecorator(_ThreadedTestCase):
   """Shared Lock Synchronized decorator test"""
 
@@ -756,9 +747,9 @@ class TestSSynchronizedDecorator(_ThreadedTestCase):
 
   def testDecoratedFunctions(self):
     self._doItExclusive()
-    self.assert_(not _decoratorlock._is_owned())
+    self.assertFalse(_decoratorlock._is_owned())
     self._doItSharer()
-    self.assert_(not _decoratorlock._is_owned())
+    self.assertFalse(_decoratorlock._is_owned())
 
   def testSharersCanCoexist(self):
     _decoratorlock.acquire(shared=1)
@@ -805,11 +796,11 @@ class TestLockSet(_ThreadedTestCase):
   def _setUpLS(self):
     """Helper to (re)initialize the lock set"""
     self.resources = ['one', 'two', 'three']
-    self.ls = locking.LockSet(members=self.resources)
+    self.ls = locking.LockSet(self.resources, "TestLockSet")
 
   def testResources(self):
     self.assertEquals(self.ls._names(), set(self.resources))
-    newls = locking.LockSet()
+    newls = locking.LockSet([], "TestLockSet.testResources")
     self.assertEquals(newls._names(), set())
 
   def testAcquireRelease(self):
@@ -1002,6 +993,127 @@ class TestLockSet(_ThreadedTestCase):
       self.failUnlessEqual(self.done.get_nowait(), 'DONE')
 
   @_Repeat
+  def testSimpleAcquireTimeoutExpiring(self):
+    names = sorted(self.ls._names())
+    self.assert_(len(names) >= 3)
+
+    # Get name of first lock
+    first = names[0]
+
+    # Get name of last lock
+    last = names.pop()
+
+    checks = [
+      # Block first and try to lock it again
+      (first, first),
+
+      # Block last and try to lock all locks
+      (None, first),
+
+      # Block last and try to lock it again
+      (last, last),
+      ]
+
+    for (wanted, block) in checks:
+      # Lock in exclusive mode
+      self.assert_(self.ls.acquire(block, shared=0))
+
+      def _AcquireOne():
+        # Try to get the same lock again with a timeout (should never succeed)
+        acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
+        if acquired:
+          self.done.put("acquired")
+          self.ls.release()
+        else:
+          self.assert_(acquired is None)
+          self.assertFalse(self.ls._list_owned())
+          self.assertFalse(self.ls._is_owned())
+          self.done.put("not acquired")
+
+      self._addThread(target=_AcquireOne)
+
+      # Wait for timeout in thread to expire
+      self._waitThreads()
+
+      # Release exclusive lock again
+      self.ls.release()
+
+      self.assertEqual(self.done.get_nowait(), "not acquired")
+      self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  @_Repeat
+  def testDelayedAndExpiringLockAcquire(self):
+    self._setUpLS()
+    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
+
+    for expire in (False, True):
+      names = sorted(self.ls._names())
+      self.assertEqual(len(names), 8)
+
+      lock_ev = dict([(i, threading.Event()) for i in names])
+
+      # Lock all in exclusive mode
+      self.assert_(self.ls.acquire(names, shared=0))
+
+      if expire:
+        # We'll wait at least 300ms per lock
+        lockwait = len(names) * [0.3]
+
+        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
+        # this gives us up to 2.4s to fail.
+        lockall_timeout = 0.4
+      else:
+        # This should finish rather quickly
+        lockwait = None
+        lockall_timeout = len(names) * 5.0
+
+      def _LockAll():
+        def acquire_notification(name):
+          if not expire:
+            self.done.put("getting %s" % name)
+
+          # Kick next lock
+          lock_ev[name].set()
+
+        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
+                           test_notify=acquire_notification):
+          self.done.put("got all")
+          self.ls.release()
+        else:
+          self.done.put("timeout on all")
+
+        # Notify all locks
+        for ev in lock_ev.values():
+          ev.set()
+
+      t = self._addThread(target=_LockAll)
+
+      for idx, name in enumerate(names):
+        # Wait for actual acquire on this lock to start
+        lock_ev[name].wait(10.0)
+
+        if expire and t.isAlive():
+          # Wait some time after getting the notification to make sure the lock
+          # acquire will expire
+          SafeSleep(lockwait[idx])
+
+        self.ls.release(names=name)
+
+      self.assertFalse(self.ls._list_owned())
+
+      self._waitThreads()
+
+      if expire:
+        # Not checking which locks were actually acquired. Doing so would be
+        # too timing-dependant.
+        self.assertEqual(self.done.get_nowait(), "timeout on all")
+      else:
+        for i in names:
+          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
+        self.assertEqual(self.done.get_nowait(), "got all")
+      self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  @_Repeat
   def testConcurrentRemove(self):
     self.ls.add('four')
     self.ls.acquire(['one', 'two', 'four'])
@@ -1176,19 +1288,19 @@ class TestGanetiLockManager(_ThreadedTestCase):
 
   def testInitAndResources(self):
     locking.GanetiLockManager._instance = None
-    self.GL = locking.GanetiLockManager()
+    self.GL = locking.GanetiLockManager([], [])
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
 
     locking.GanetiLockManager._instance = None
-    self.GL = locking.GanetiLockManager(nodes=self.nodes)
+    self.GL = locking.GanetiLockManager(self.nodes, [])
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
 
     locking.GanetiLockManager._instance = None
-    self.GL = locking.GanetiLockManager(instances=self.instances)
+    self.GL = locking.GanetiLockManager([], self.instances)
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
@@ -1311,6 +1423,4 @@ class TestGanetiLockManager(_ThreadedTestCase):
 
 
 if __name__ == '__main__':
-  unittest.main()
-  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
-  #unittest.TextTestRunner(verbosity=2).run(suite)
+  testutils.GanetiTestProgram()