# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 0.0510-1301, USA.
+# 02110-1301, USA.
"""Script for unittesting the locking module"""
import Queue
import threading
import random
+import gc
import itertools
+from ganeti import constants
from ganeti import locking
from ganeti import errors
from ganeti import utils
from ganeti import compat
+from ganeti import objects
+from ganeti import query
import testutils
def _testAcquireRelease(self):
self.assertFalse(self.cond._is_owned())
- self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.acquire()
self.cond.release()
self.assertFalse(self.cond._is_owned())
- self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
def _testNotification(self):
self._addThread(target=_NotifyAll)
self.assertEqual(self.done.get(True, 1), "NE")
self.assertRaises(Queue.Empty, self.done.get_nowait)
- self.cond.wait()
+ self.cond.wait(None)
self.assertEqual(self.done.get(True, 1), "NA")
self.assertEqual(self.done.get(True, 1), "NN")
self.assert_(self.cond._is_owned())
def testNoNotifyReuse(self):
self.cond.acquire()
self.cond.notifyAll()
- self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.release()
def _BlockingWait():
self.cond.acquire()
self.done.put("A")
- self.cond.wait()
+ self.cond.wait(None)
self.cond.release()
self.done.put("W")
self.sl = locking.SharedLock("TestSharedLock")
def testSequenceAndOwnership(self):
- self.assertFalse(self.sl._is_owned())
+ self.assertFalse(self.sl.is_owned())
self.sl.acquire(shared=1)
- self.assert_(self.sl._is_owned())
- self.assert_(self.sl._is_owned(shared=1))
- self.assertFalse(self.sl._is_owned(shared=0))
+ self.assert_(self.sl.is_owned())
+ self.assert_(self.sl.is_owned(shared=1))
+ self.assertFalse(self.sl.is_owned(shared=0))
self.sl.release()
- self.assertFalse(self.sl._is_owned())
+ self.assertFalse(self.sl.is_owned())
self.sl.acquire()
- self.assert_(self.sl._is_owned())
- self.assertFalse(self.sl._is_owned(shared=1))
- self.assert_(self.sl._is_owned(shared=0))
+ self.assert_(self.sl.is_owned())
+ self.assertFalse(self.sl.is_owned(shared=1))
+ self.assert_(self.sl.is_owned(shared=0))
self.sl.release()
- self.assertFalse(self.sl._is_owned())
+ self.assertFalse(self.sl.is_owned())
self.sl.acquire(shared=1)
- self.assert_(self.sl._is_owned())
- self.assert_(self.sl._is_owned(shared=1))
- self.assertFalse(self.sl._is_owned(shared=0))
+ self.assert_(self.sl.is_owned())
+ self.assert_(self.sl.is_owned(shared=1))
+ self.assertFalse(self.sl.is_owned(shared=0))
self.sl.release()
- self.assertFalse(self.sl._is_owned())
+ self.assertFalse(self.sl.is_owned())
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
self.assertRaises(errors.LockError, self.sl.delete)
def testDeleteTimeout(self):
- self.sl.delete(timeout=60)
+ self.assertTrue(self.sl.delete(timeout=60))
+
+ def testDeleteTimeoutFail(self):
+ ready = threading.Event()
+ finish = threading.Event()
+
+ def fn():
+ self.sl.acquire(shared=0)
+ ready.set()
+
+ finish.wait()
+ self.sl.release()
+
+ self._addThread(target=fn)
+ ready.wait()
+
+ # Test if deleting a lock owned in exclusive mode by another thread fails
+ # to delete when a timeout is used
+ self.assertFalse(self.sl.delete(timeout=0.02))
+
+ finish.set()
+ self._waitThreads()
+
+ self.assertTrue(self.sl.delete())
+ self.assertRaises(errors.LockError, self.sl.acquire)
def testNoDeleteIfSharer(self):
self.sl.acquire(shared=1)
self.assertRaises(Queue.Empty, self.done.get_nowait)
+ def testIllegalDowngrade(self):
+ # Not yet acquired
+ self.assertRaises(AssertionError, self.sl.downgrade)
+
+ # Acquire in shared mode, downgrade should be no-op
+ self.assertTrue(self.sl.acquire(shared=1))
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.assertTrue(self.sl.downgrade())
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.sl.release()
+
+ def testDowngrade(self):
+ self.assertTrue(self.sl.acquire())
+ self.assertTrue(self.sl.is_owned(shared=0))
+ self.assertTrue(self.sl.downgrade())
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.sl.release()
+
+ @_Repeat
+ def testDowngradeJumpsAheadOfExclusive(self):
+ def _KeepExclusive(ev_got, ev_downgrade, ev_release):
+ self.assertTrue(self.sl.acquire())
+ self.assertTrue(self.sl.is_owned(shared=0))
+ ev_got.set()
+ ev_downgrade.wait()
+ self.assertTrue(self.sl.is_owned(shared=0))
+ self.assertTrue(self.sl.downgrade())
+ self.assertTrue(self.sl.is_owned(shared=1))
+ ev_release.wait()
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.sl.release()
+
+ def _KeepExclusive2(ev_started, ev_release):
+ self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
+ self.assertTrue(self.sl.is_owned(shared=0))
+ ev_release.wait()
+ self.assertTrue(self.sl.is_owned(shared=0))
+ self.sl.release()
+
+ def _KeepShared(ev_started, ev_got, ev_release):
+ self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
+ self.assertTrue(self.sl.is_owned(shared=1))
+ ev_got.set()
+ ev_release.wait()
+ self.assertTrue(self.sl.is_owned(shared=1))
+ self.sl.release()
+
+ # Acquire lock in exclusive mode
+ ev_got_excl1 = threading.Event()
+ ev_downgrade_excl1 = threading.Event()
+ ev_release_excl1 = threading.Event()
+ th_excl1 = self._addThread(target=_KeepExclusive,
+ args=(ev_got_excl1, ev_downgrade_excl1,
+ ev_release_excl1))
+ ev_got_excl1.wait()
+
+ # Start a second exclusive acquire
+ ev_started_excl2 = threading.Event()
+ ev_release_excl2 = threading.Event()
+ th_excl2 = self._addThread(target=_KeepExclusive2,
+ args=(ev_started_excl2, ev_release_excl2))
+ ev_started_excl2.wait()
+
+ # Start shared acquires, will jump ahead of second exclusive acquire when
+ # first exclusive acquire downgrades
+ ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
+ ev_release_shared = threading.Event()
+
+ th_shared = [self._addThread(target=_KeepShared,
+ args=(ev_started, ev_got, ev_release_shared))
+ for (ev_started, ev_got) in ev_shared]
+
+ # Wait for all shared acquires to start
+ for (ev, _) in ev_shared:
+ ev.wait()
+
+ # Check lock information
+ self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+ [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
+ [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
+ self.assertEqual([(pendmode, sorted(waiting))
+ for (pendmode, waiting) in pending],
+ [("exclusive", [th_excl2.getName()]),
+ ("shared", sorted(th.getName() for th in th_shared))])
+
+ # Shared acquires won't start until the exclusive lock is downgraded
+ ev_downgrade_excl1.set()
+
+ # Wait for all shared acquires to be successful
+ for (_, ev) in ev_shared:
+ ev.wait()
+
+ # Check lock information again
+ self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
+ query.LQ_PENDING])),
+ [(self.sl.name, "shared", None,
+ [("exclusive", [th_excl2.getName()])])])
+ [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
+ self.assertEqual(set(owner), set([th_excl1.getName()] +
+ [th.getName() for th in th_shared]))
+
+ ev_release_excl1.set()
+ ev_release_excl2.set()
+ ev_release_shared.set()
+
+ self._waitThreads()
+
+ self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
+ query.LQ_PENDING])),
+ [(self.sl.name, None, None, [])])
+
@_Repeat
def testMixedAcquireTimeout(self):
sync = threading.Event()
prev.wait()
# Check lock information
- self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
- self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
- ["exclusive", [threading.currentThread().getName()]])
- self.assertEqual(self.sl.GetInfo(["name", "pending"]),
- [self.sl.name,
- [(["exclusive", "shared"][int(bool(shared))],
- sorted([t.getName() for t in threads]))
- for acquires in [perprio[i]
- for i in sorted(perprio.keys())]
- for (shared, _, threads) in acquires]])
+ self.assertEqual(self.sl.GetLockInfo(set()),
+ [(self.sl.name, None, None, None)])
+ self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+ [(self.sl.name, "exclusive",
+ [threading.currentThread().getName()], None)])
+
+ self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
+ perprio)
# Let threads acquire the lock
self.sl.release()
self.assertRaises(Queue.Empty, self.done.get_nowait)
+ def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
+ self.assertEqual(name, self.sl.name)
+ self.assert_(mode is None)
+ self.assert_(owner is None)
+
+ self.assertEqual([(pendmode, sorted(waiting))
+ for (pendmode, waiting) in pending],
+ [(["exclusive", "shared"][int(bool(shared))],
+ sorted(t.getName() for t in threads))
+ for acquires in [perprio[i]
+ for i in sorted(perprio.keys())]
+ for (shared, _, threads) in acquires])
+
+ class _FakeTimeForSpuriousNotifications:
+ def __init__(self, now, check_end):
+ self.now = now
+ self.check_end = check_end
+
+ # Deterministic random number generator
+ self.rnd = random.Random(15086)
+
+ def time(self):
+ # Advance time if the random number generator thinks so (this is to test
+ # multiple notifications without advancing the time)
+ if self.rnd.random() < 0.3:
+ self.now += self.rnd.random()
+
+ self.check_end(self.now)
+
+ return self.now
+
+ @_Repeat
+ def testAcquireTimeoutWithSpuriousNotifications(self):
+ ready = threading.Event()
+ locked = threading.Event()
+ req = Queue.Queue(0)
+
+ epoch = 4000.0
+ timeout = 60.0
+
+ def check_end(now):
+ self.assertFalse(locked.isSet())
+
+ # If we waited long enough (in virtual time), tell main thread to release
+ # lock, otherwise tell it to notify once more
+ req.put(now < (epoch + (timeout * 0.8)))
+
+ time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
+
+ sl = locking.SharedLock("test", _time_fn=time_fn)
+
+ # Acquire in exclusive mode
+ sl.acquire(shared=0)
+
+ def fn():
+ self.assertTrue(sl.acquire(shared=0, timeout=timeout,
+ test_notify=ready.set))
+ locked.set()
+ sl.release()
+ self.done.put("success")
+
+ # Start acquire with timeout and wait for it to be ready
+ self._addThread(target=fn)
+ ready.wait()
+
+ # The separate thread is now waiting to acquire the lock, so start sending
+ # spurious notifications.
+
+ # Wait for separate thread to ask for another notification
+ count = 0
+ while req.get():
+ # After sending the notification, the lock will take a short amount of
+ # time to notice and to retrieve the current time
+ sl._notify_topmost()
+ count += 1
+
+ self.assertTrue(count > 100, "Not enough notifications were sent")
+
+ self.assertFalse(locked.isSet())
+
+ # Some notifications have been sent, now actually release the lock
+ sl.release()
+
+ # Wait for lock to be acquired
+ locked.wait()
+
+ self._waitThreads()
+
+ self.assertEqual(self.done.get_nowait(), "success")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
class TestSharedLockInCondition(_ThreadedTestCase):
"""SharedLock as a condition lock tests"""
def testKeepMode(self):
self.cond.acquire(shared=1)
- self.assert_(self.sl._is_owned(shared=1))
+ self.assert_(self.sl.is_owned(shared=1))
self.cond.wait(0)
- self.assert_(self.sl._is_owned(shared=1))
+ self.assert_(self.sl.is_owned(shared=1))
self.cond.release()
self.cond.acquire(shared=0)
- self.assert_(self.sl._is_owned(shared=0))
+ self.assert_(self.sl.is_owned(shared=0))
self.cond.wait(0)
- self.assert_(self.sl._is_owned(shared=0))
+ self.assert_(self.sl.is_owned(shared=0))
self.cond.release()
@locking.ssynchronized(_decoratorlock)
def _doItExclusive(self):
- self.assert_(_decoratorlock._is_owned())
+ self.assert_(_decoratorlock.is_owned())
self.done.put('EXC')
@locking.ssynchronized(_decoratorlock, shared=1)
def _doItSharer(self):
- self.assert_(_decoratorlock._is_owned(shared=1))
+ self.assert_(_decoratorlock.is_owned(shared=1))
self.done.put('SHR')
def testDecoratedFunctions(self):
self._doItExclusive()
- self.assertFalse(_decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock.is_owned())
self._doItSharer()
- self.assertFalse(_decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock.is_owned())
def testSharersCanCoexist(self):
_decoratorlock.acquire(shared=1)
newls = locking.LockSet([], "TestLockSet.testResources")
self.assertEquals(newls._names(), set())
+ def testCheckOwnedUnknown(self):
+ self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
+ for shared in [-1, 0, 1, 6378, 24255]:
+ self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
+ shared=shared))
+
+ def testCheckOwnedUnknownWhileHolding(self):
+ self.assertFalse(self.ls.check_owned([]))
+ self.ls.acquire("one", shared=1)
+ self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
+ self.assertTrue(self.ls.check_owned("one", shared=1))
+ self.assertFalse(self.ls.check_owned("one", shared=0))
+ self.assertFalse(self.ls.check_owned(["one", "two"]))
+ self.assertRaises(errors.LockError, self.ls.check_owned,
+ ["one", "nonexist"])
+ self.assertRaises(errors.LockError, self.ls.check_owned, "")
+ self.ls.release()
+ self.assertFalse(self.ls.check_owned([]))
+ self.assertFalse(self.ls.check_owned("one"))
+
def testAcquireRelease(self):
+ self.assertFalse(self.ls.check_owned(self.ls._names()))
self.assert_(self.ls.acquire('one'))
- self.assertEquals(self.ls._list_owned(), set(['one']))
+ self.assertEquals(self.ls.list_owned(), set(['one']))
+ self.assertTrue(self.ls.check_owned("one"))
+ self.assertTrue(self.ls.check_owned("one", shared=0))
+ self.assertFalse(self.ls.check_owned("one", shared=1))
self.ls.release()
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
+ self.assertFalse(self.ls.check_owned(self.ls._names()))
self.assertEquals(self.ls.acquire(['one']), set(['one']))
- self.assertEquals(self.ls._list_owned(), set(['one']))
+ self.assertEquals(self.ls.list_owned(), set(['one']))
self.ls.release()
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
self.ls.acquire(['one', 'two', 'three'])
- self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
+ self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
+ self.assertTrue(self.ls.check_owned(self.ls._names()))
+ self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+ self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
self.ls.release('one')
- self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
+ self.assertFalse(self.ls.check_owned(["one"]))
+ self.assertTrue(self.ls.check_owned(["two", "three"]))
+ self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
+ self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
+ self.assertEquals(self.ls.list_owned(), set(['two', 'three']))
self.ls.release(['three'])
- self.assertEquals(self.ls._list_owned(), set(['two']))
+ self.assertEquals(self.ls.list_owned(), set(['two']))
self.ls.release()
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
- self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
+ self.assertEquals(self.ls.list_owned(), set(['one', 'three']))
self.ls.release()
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
+ for name in self.ls._names():
+ self.assertFalse(self.ls.check_owned(name))
def testNoDoubleAcquire(self):
self.ls.acquire('one')
def testAddRemove(self):
self.ls.add('four')
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
self.assert_('four' in self.ls._names())
self.ls.add(['five', 'six', 'seven'], acquired=1)
self.assert_('five' in self.ls._names())
self.assert_('six' in self.ls._names())
self.assert_('seven' in self.ls._names())
- self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
+ self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven']))
self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
self.assert_('five' not in self.ls._names())
self.assert_('six' not in self.ls._names())
- self.assertEquals(self.ls._list_owned(), set(['seven']))
+ self.assertEquals(self.ls.list_owned(), set(['seven']))
self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
self.ls.remove('seven')
self.assert_('seven' not in self.ls._names())
- self.assertEquals(self.ls._list_owned(), set([]))
+ self.assertEquals(self.ls.list_owned(), set([]))
self.ls.acquire(None, shared=1)
self.assertRaises(AssertionError, self.ls.add, 'eight')
self.ls.release()
self.ls.acquire(None)
self.ls.add('eight', acquired=1)
self.assert_('eight' in self.ls._names())
- self.assert_('eight' in self.ls._list_owned())
+ self.assert_('eight' in self.ls.list_owned())
self.ls.add('nine')
self.assert_('nine' in self.ls._names())
- self.assert_('nine' not in self.ls._list_owned())
+ self.assert_('nine' not in self.ls.list_owned())
self.ls.release()
self.ls.remove(['two'])
self.assert_('two' not in self.ls._names())
def testAcquireSetLock(self):
# acquire the set-lock exclusively
self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
- self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
- self.assertEquals(self.ls._is_owned(), True)
+ self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
+ self.assertEquals(self.ls.is_owned(), True)
self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
# I can still add/remove elements...
self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
set(['two', 'two', 'three']))
self.ls.release(['two', 'two'])
- self.assertEquals(self.ls._list_owned(), set(['three']))
+ self.assertEquals(self.ls.list_owned(), set(['three']))
def testEmptyAcquire(self):
# Acquire an empty list of locks...
self.assertEquals(self.ls.acquire([]), set())
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
# New locks can still be addded
self.assert_(self.ls.add('six'))
# "re-acquiring" is not an issue, since we had really acquired nothing
self.assertEquals(self.ls.acquire([], shared=1), set())
- self.assertEquals(self.ls._list_owned(), set())
+ self.assertEquals(self.ls.list_owned(), set())
# We haven't really acquired anything, so we cannot release
self.assertRaises(AssertionError, self.ls.release)
self.ls.release()
else:
self.assert_(acquired is None)
- self.assertFalse(self.ls._list_owned())
- self.assertFalse(self.ls._is_owned())
+ self.assertFalse(self.ls.list_owned())
+ self.assertFalse(self.ls.is_owned())
self.done.put("not acquired")
self._addThread(target=_AcquireOne)
self.ls.release(names=name)
- self.assertFalse(self.ls._list_owned())
+ self.assertFalse(self.ls.list_owned())
self._waitThreads()
self.ls.add('four')
self.ls.add('five', acquired=1)
self.ls.add('six', acquired=1, shared=1)
- self.assertEquals(self.ls._list_owned(),
+ self.assertEquals(self.ls.list_owned(),
set(['one', 'two', 'three', 'five', 'six']))
- self.assertEquals(self.ls._is_owned(), True)
+ self.assertEquals(self.ls.is_owned(), True)
self.assertEquals(self.ls._names(),
set(['one', 'two', 'three', 'four', 'five', 'six']))
self.ls.release()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._setUpLS()
+ def testAcquireWithNamesDowngrade(self):
+ self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
+ self.assertTrue(self.ls.is_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+ self.ls.release()
+ self.assertFalse(self.ls.is_owned())
+ self.assertFalse(self.ls._get_lock().is_owned())
+ # Can't downgrade after releasing
+ self.assertRaises(AssertionError, self.ls.downgrade, "two")
+
+ def testDowngrade(self):
+ # Not owning anything, must raise an exception
+ self.assertFalse(self.ls.is_owned())
+ self.assertRaises(AssertionError, self.ls.downgrade)
+
+ self.assertFalse(compat.any(i.is_owned()
+ for i in self.ls._get_lockdict().values()))
+ self.assertFalse(self.ls.check_owned(self.ls._names()))
+ for name in self.ls._names():
+ self.assertFalse(self.ls.check_owned(name))
+
+ self.assertEquals(self.ls.acquire(None, shared=0),
+ set(["one", "two", "three"]))
+ self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
+
+ self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+ for name in self.ls._names():
+ self.assertTrue(self.ls.check_owned(name))
+ self.assertTrue(self.ls.check_owned(name, shared=0))
+ self.assertFalse(self.ls.check_owned(name, shared=1))
+
+ self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+ self.assertTrue(compat.all(i.is_owned(shared=0)
+ for i in self.ls._get_lockdict().values()))
+
+ # Start downgrading locks
+ self.assertTrue(self.ls.downgrade(names=["one"]))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+ self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
+ for name, lock in
+ self.ls._get_lockdict().items()))
+
+ self.assertFalse(self.ls.check_owned("one", shared=0))
+ self.assertTrue(self.ls.check_owned("one", shared=1))
+ self.assertTrue(self.ls.check_owned("two", shared=0))
+ self.assertTrue(self.ls.check_owned("three", shared=0))
+
+ # Downgrade second lock
+ self.assertTrue(self.ls.downgrade(names="two"))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=0))
+ should_share = lambda name: [0, 1][int(name in ("one", "two"))]
+ self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
+ for name, lock in
+ self.ls._get_lockdict().items()))
+
+ self.assertFalse(self.ls.check_owned("one", shared=0))
+ self.assertTrue(self.ls.check_owned("one", shared=1))
+ self.assertFalse(self.ls.check_owned("two", shared=0))
+ self.assertTrue(self.ls.check_owned("two", shared=1))
+ self.assertTrue(self.ls.check_owned("three", shared=0))
+
+ # Downgrading the last exclusive lock to shared must downgrade the
+ # lockset-internal lock too
+ self.assertTrue(self.ls.downgrade(names="three"))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+ self.assertTrue(compat.all(i.is_owned(shared=1)
+ for i in self.ls._get_lockdict().values()))
+
+ # Verify owned locks
+ for name in self.ls._names():
+ self.assertTrue(self.ls.check_owned(name, shared=1))
+
+ # Downgrading a shared lock must be a no-op
+ self.assertTrue(self.ls.downgrade(names=["one", "three"]))
+ self.assertTrue(self.ls._get_lock().is_owned(shared=1))
+ self.assertTrue(compat.all(i.is_owned(shared=1)
+ for i in self.ls._get_lockdict().values()))
+
+ self.ls.release()
+
def testPriority(self):
def _Acquire(prev, next, name, priority, success_fn):
prev.wait()
def testAcquireRelease(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
- self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
+ self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
+ shared=1))
+ self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
self.GL.release(locking.LEVEL_NODE, ['n2'])
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1']))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
self.GL.release(locking.LEVEL_NODE)
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.release(locking.LEVEL_INSTANCE)
self.assertRaises(errors.LockError, self.GL.acquire,
locking.LEVEL_INSTANCE, ['i5'])
self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3']))
def testAcquireWholeSets(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
set(self.instances))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
set(self.instances))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
set(self.nodegroups))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
set(self.nodegroups))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
set(self.nodes))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
set(self.nodes))
self.GL.release(locking.LEVEL_NODE)
self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
set(self.instances))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
set(self.instances))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
set(['n2']))
- self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+ self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
set(['n2']))
self.GL.release(locking.LEVEL_NODE)
self.GL.release(locking.LEVEL_INSTANCE)
locks.append(locking.SharedLock(name, monitor=self.lm))
self.assertEqual(len(self.lm._locks), len(locks))
-
- self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
- 100)
+ result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
+ self.assertEqual(len(result.fields), 1)
+ self.assertEqual(len(result.data), 100)
# Delete all locks
del locks[:]
# Check order in which locks were added
self.assertEqual([i.name for i in locks], expnames)
- # Sync queries are not supported
- self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
-
# Check query result
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
- False),
- [[name, None, None, []]
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assert_(isinstance(result, dict))
+ response = objects.QueryResponse.FromDict(result)
+ self.assertEqual(response.data,
+ [[(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, [])]
for name in utils.NiceSort(expnames)])
+ self.assertEqual(len(response.fields), 4)
+ self.assertEqual(["name", "mode", "owner", "pending"],
+ [fdef.name for fdef in response.fields])
# Test exclusive acquire
for tlock in locks[::4]:
try:
def _GetExpResult(name):
if tlock.name == name:
- return [name, "exclusive", [threading.currentThread().getName()],
- []]
- return [name, None, None, []]
-
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
- "pending"], False),
+ return [(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.RS_NORMAL, [])]
+ return [(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, [])]
+
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
[_GetExpResult(name)
for name in utils.NiceSort(expnames)])
finally:
i.wait()
# Check query result
- for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
- "owner"], False):
- if name == tlock1.name:
- self.assertEqual(mode, "shared")
- self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ response = objects.QueryResponse.FromDict(result)
+ for (name, mode, owner) in response.data:
+ (name_status, name_value) = name
+ (owner_status, owner_value) = owner
+
+ self.assertEqual(name_status, constants.RS_NORMAL)
+ self.assertEqual(owner_status, constants.RS_NORMAL)
+
+ if name_value == tlock1.name:
+ self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
+ self.assertEqual(set(owner_value),
+ set(i.getName() for i in tthreads1))
continue
- if name == tlock2.name:
- self.assertEqual(mode, "shared")
- self.assertEqual(owner, [tthread2.getName()])
+ if name_value == tlock2.name:
+ self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
+ self.assertEqual(owner_value, [tthread2.getName()])
continue
- if name == tlock3.name:
- self.assertEqual(mode, "exclusive")
- self.assertEqual(owner, [tthread3.getName()])
+ if name_value == tlock3.name:
+ self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
+ self.assertEqual(owner_value, [tthread3.getName()])
continue
- self.assert_(name in expnames)
- self.assert_(mode is None)
- self.assert_(owner is None)
+ self.assert_(name_value in expnames)
+ self.assertEqual(mode, (constants.RS_NORMAL, None))
+ self.assert_(owner_value is None)
# Release locks again
releaseev.set()
self._waitThreads()
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
- [[name, None, None]
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]
for name in utils.NiceSort(expnames)])
def testDelete(self):
lock = locking.SharedLock("TestLock", monitor=self.lm)
self.assertEqual(len(self.lm._locks), 1)
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
- [[lock.name, None, None]])
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]])
lock.delete()
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
- [[lock.name, "deleted", None]])
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, "deleted"),
+ (constants.RS_NORMAL, None)]])
self.assertEqual(len(self.lm._locks), 1)
def testPending(self):
lock.acquire()
try:
self.assertEqual(len(self.lm._locks), 1)
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
- [[lock.name, "exclusive",
- [threading.currentThread().getName()]]])
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL,
+ [threading.currentThread().getName()])]])
threads = []
# All acquires are waiting now
if shared:
- pending = [("shared", sorted([t.getName() for t in threads]))]
+ pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
else:
pending = [("exclusive", [t.getName()]) for t in threads]
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
- "pending"], False),
- [[lock.name, "exclusive",
- [threading.currentThread().getName()],
- pending]])
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.RS_NORMAL, pending)]])
self.assertEqual(len(self.lm._locks), 1)
finally:
self._waitThreads()
# No pending acquires
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
- False),
- [[lock.name, None, None, []]])
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lock.name),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, [])]])
self.assertEqual(len(self.lm._locks), 1)
+ def testDeleteAndRecreate(self):
+ lname = "TestLock101923193"
+
+ # Create some locks with the same name and keep all references
+ locks = [locking.SharedLock(lname, monitor=self.lm)
+ for _ in range(5)]
+
+ self.assertEqual(len(self.lm._locks), len(locks))
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]] * 5)
+
+ locks[2].delete()
+
+ # Check information order
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]] * 2 +
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, "deleted"),
+ (constants.RS_NORMAL, None)]] +
+ [[(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)]] * 2)
+
+ locks[1].acquire(shared=0)
+
+ last_status = [
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, "exclusive"),
+ (constants.RS_NORMAL, [threading.currentThread().getName()])],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, "deleted"),
+ (constants.RS_NORMAL, None)],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)],
+ [(constants.RS_NORMAL, lname),
+ (constants.RS_NORMAL, None),
+ (constants.RS_NORMAL, None)],
+ ]
+
+ # Check information order
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
+
+ self.assertEqual(len(set(self.lm._locks.values())), len(locks))
+ self.assertEqual(len(self.lm._locks), len(locks))
+
+ # Check lock deletion
+ for idx in range(len(locks)):
+ del locks[0]
+ assert gc.isenabled()
+ gc.collect()
+ self.assertEqual(len(self.lm._locks), len(locks))
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ last_status[idx + 1:])
+
+ # All locks should have been deleted
+ assert not locks
+ self.assertFalse(self.lm._locks)
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
+
+ class _FakeLock:
+ def __init__(self):
+ self._info = []
+
+ def AddResult(self, *args):
+ self._info.append(args)
+
+ def CountPending(self):
+ return len(self._info)
+
+ def GetLockInfo(self, requested):
+ (exp_requested, result) = self._info.pop(0)
+
+ if exp_requested != requested:
+ raise Exception("Requested information (%s) does not match"
+ " expectations (%s)" % (requested, exp_requested))
+
+ return result
+
+ def testMultipleResults(self):
+ fl1 = self._FakeLock()
+ fl2 = self._FakeLock()
+
+ self.lm.RegisterLock(fl1)
+ self.lm.RegisterLock(fl2)
+
+ # Empty information
+ for i in [fl1, fl2]:
+ i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
+ for i in [fl1, fl2]:
+ self.assertEqual(i.CountPending(), 0)
+
+ # Check ordering
+ for fn in [lambda x: x, reversed, sorted]:
+ fl1.AddResult(set(), list(fn([
+ ("aaa", None, None, None),
+ ("bbb", None, None, None),
+ ])))
+ fl2.AddResult(set(), [])
+ result = self.lm.QueryLocks(["name"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+ [(constants.RS_NORMAL, "aaa")],
+ [(constants.RS_NORMAL, "bbb")],
+ ])
+ for i in [fl1, fl2]:
+ self.assertEqual(i.CountPending(), 0)
+
+ for fn2 in [lambda x: x, reversed, sorted]:
+ fl1.AddResult(set([query.LQ_MODE]), list(fn([
+ # Same name, but different information
+ ("aaa", "mode0", None, None),
+ ("aaa", "mode1", None, None),
+ ("aaa", "mode2", None, None),
+ ("aaa", "mode3", None, None),
+ ])))
+ fl2.AddResult(set([query.LQ_MODE]), [
+ ("zzz", "end", None, None),
+ ("000", "start", None, None),
+ ] + list(fn2([
+ ("aaa", "b200", None, None),
+ ("aaa", "b300", None, None),
+ ])))
+ result = self.lm.QueryLocks(["name", "mode"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data, [
+ [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
+ ] + list(fn([
+ # Name is the same, so order must be equal to incoming order
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
+ ])) + list(fn2([
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
+ [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
+ ])) + [
+ [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
+ ])
+ for i in [fl1, fl2]:
+ self.assertEqual(i.CountPending(), 0)
+
if __name__ == '__main__':
testutils.GanetiTestProgram()