#!/usr/bin/python
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import time
import Queue
import threading
+import random
+import itertools
+from ganeti import constants
from ganeti import locking
from ganeti import errors
+from ganeti import utils
+from ganeti import compat
+from ganeti import objects
+from ganeti import query
import testutils
# This is used to test the ssynchronize decorator.
# Since it's passed as input to a decorator it must be declared as a global.
-_decoratorlock = locking.SharedLock()
+_decoratorlock = locking.SharedLock("decorator lock")
#: List for looping tests
ITERATIONS = range(8)
self.cond = cls(self.lock)
def _testAcquireRelease(self):
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.assert_(self.cond._is_owned())
self.cond.release()
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.assertEqual(self.done.get(True, 1), "NN")
self.assert_(self.cond._is_owned())
self.cond.release()
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
class TestSingleNotifyPipeCondition(_ConditionTestCase):
self._testNotification()
def _TestWait(self, fn):
- self._addThread(target=fn)
- self._addThread(target=fn)
- self._addThread(target=fn)
+ threads = [
+ self._addThread(target=fn),
+ self._addThread(target=fn),
+ self._addThread(target=fn),
+ ]
# Wait for threads to be waiting
- self.assertEqual(self.done.get(True, 1), "A")
- self.assertEqual(self.done.get(True, 1), "A")
- self.assertEqual(self.done.get(True, 1), "A")
+ for _ in threads:
+ self.assertEqual(self.done.get(True, 1), "A")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.acquire()
- self.assertEqual(self.cond._nwaiters, 3)
- # This new thread can"t acquire the lock, and thus call wait, before we
+ self.assertEqual(len(self.cond._waiters), 3)
+ self.assertEqual(self.cond._waiters, set(threads))
+ # This new thread can't acquire the lock, and thus call wait, before we
# release it
self._addThread(target=fn)
self.cond.notifyAll()
def setUp(self):
_ThreadedTestCase.setUp(self)
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock("TestSharedLock")
def testSequenceAndOwnership(self):
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
- self.assert_(not self.sl._is_owned(shared=0))
+ self.assertFalse(self.sl._is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
self.sl.acquire()
self.assert_(self.sl._is_owned())
- self.assert_(not self.sl._is_owned(shared=1))
+ self.assertFalse(self.sl._is_owned(shared=1))
self.assert_(self.sl._is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
- self.assert_(not self.sl._is_owned(shared=0))
+ self.assertFalse(self.sl._is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'DEL')
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testExclusiveBlocksSharer(self):
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'DEL')
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testWaitingExclusiveBlocksSharer(self):
# The threads who were pending return ERR
for _ in range(4):
self.assertEqual(self.done.get_nowait(), 'ERR')
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testDeletePendingDeleteExclusiveSharers(self):
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
- self.sl = locking.SharedLock()
+ self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testExclusiveAcquireTimeout(self):
self.assertRaises(Queue.Empty, self.done.get_nowait)
+ def testPriority(self):
+ # Acquire in exclusive mode
+ self.assert_(self.sl.acquire(shared=0))
+
+ # Queue acquires
+ def _Acquire(prev, next, shared, priority, result):
+ prev.wait()
+ self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
+ try:
+ self.done.put(result)
+ finally:
+ self.sl.release()
+
+ counter = itertools.count(0)
+ priorities = range(-20, 30)
+ first = threading.Event()
+ prev = first
+
+ # Data structure:
+ # {
+ # priority:
+ # [(shared/exclusive, set(acquire names), set(pending threads)),
+ # (shared/exclusive, ...),
+ # ...,
+ # ],
+ # }
+ perprio = {}
+
+ # References shared acquire per priority in L{perprio}. Data structure:
+ # {
+ # priority: (shared=1, set(acquire names), set(pending threads)),
+ # }
+ prioshared = {}
+
+ for seed in [4979, 9523, 14902, 32440]:
+ # Use a deterministic random generator
+ rnd = random.Random(seed)
+ for priority in [rnd.choice(priorities) for _ in range(30)]:
+ modes = [0, 1]
+ rnd.shuffle(modes)
+ for shared in modes:
+ # Unique name
+ acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
+
+ ev = threading.Event()
+ thread = self._addThread(target=_Acquire,
+ args=(prev, ev, shared, priority, acqname))
+ prev = ev
+
+ # Record expected aqcuire, see above for structure
+ data = (shared, set([acqname]), set([thread]))
+ priolist = perprio.setdefault(priority, [])
+ if shared:
+ priosh = prioshared.get(priority, None)
+ if priosh:
+ # Shared acquires are merged
+ for i, j in zip(priosh[1:], data[1:]):
+ i.update(j)
+ assert data[0] == priosh[0]
+ else:
+ prioshared[priority] = data
+ priolist.append(data)
+ else:
+ priolist.append(data)
+
+ # Start all acquires and wait for them
+ first.set()
+ prev.wait()
+
+ # Check lock information
+ self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
+ self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+ (self.sl.name, "exclusive",
+ [threading.currentThread().getName()], None))
+
+ self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
+
+ # Let threads acquire the lock
+ self.sl.release()
+
+ # Wait for everything to finish
+ self._waitThreads()
+
+ self.assert_(self.sl._check_empty())
+
+ # Check acquires by priority
+ for acquires in [perprio[i] for i in sorted(perprio.keys())]:
+ for (_, names, _) in acquires:
+ # For shared acquires, the set will contain 1..n entries. For exclusive
+ # acquires only one.
+ while names:
+ names.remove(self.done.get_nowait())
+ self.assertFalse(compat.any(names for (_, names, _) in acquires))
+
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
+ self.assertEqual(name, self.sl.name)
+ self.assert_(mode is None)
+ self.assert_(owner is None)
+
+ self.assertEqual([(pendmode, sorted(waiting))
+ for (pendmode, waiting) in pending],
+ [(["exclusive", "shared"][int(bool(shared))],
+ sorted(t.getName() for t in threads))
+ for acquires in [perprio[i]
+ for i in sorted(perprio.keys())]
+ for (shared, _, threads) in acquires])
+
+
+class TestSharedLockInCondition(_ThreadedTestCase):
+ """SharedLock as a condition lock tests"""
+
+ def setUp(self):
+ _ThreadedTestCase.setUp(self)
+ self.sl = locking.SharedLock("TestSharedLockInCondition")
+ self.setCondition()
+
+ def setCondition(self):
+ self.cond = threading.Condition(self.sl)
+
+ def testKeepMode(self):
+ self.cond.acquire(shared=1)
+ self.assert_(self.sl._is_owned(shared=1))
+ self.cond.wait(0)
+ self.assert_(self.sl._is_owned(shared=1))
+ self.cond.release()
+ self.cond.acquire(shared=0)
+ self.assert_(self.sl._is_owned(shared=0))
+ self.cond.wait(0)
+ self.assert_(self.sl._is_owned(shared=0))
+ self.cond.release()
+
+
+class TestSharedLockInPipeCondition(TestSharedLockInCondition):
+ """SharedLock as a pipe condition lock tests"""
+
+ def setCondition(self):
+ self.cond = locking.PipeCondition(self.sl)
+
class TestSSynchronizedDecorator(_ThreadedTestCase):
"""Shared Lock Synchronized decorator test"""
def testDecoratedFunctions(self):
self._doItExclusive()
- self.assert_(not _decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock._is_owned())
self._doItSharer()
- self.assert_(not _decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock._is_owned())
def testSharersCanCoexist(self):
_decoratorlock.acquire(shared=1)
def _setUpLS(self):
"""Helper to (re)initialize the lock set"""
self.resources = ['one', 'two', 'three']
- self.ls = locking.LockSet(members=self.resources)
+ self.ls = locking.LockSet(self.resources, "TestLockSet")
def testResources(self):
self.assertEquals(self.ls._names(), set(self.resources))
- newls = locking.LockSet()
+ newls = locking.LockSet([], "TestLockSet.testResources")
self.assertEquals(newls._names(), set())
def testAcquireRelease(self):
self.ls.release()
else:
self.assert_(acquired is None)
- self.assert_(not self.ls._list_owned())
- self.assert_(not self.ls._is_owned())
+ self.assertFalse(self.ls._list_owned())
+ self.assertFalse(self.ls._is_owned())
self.done.put("not acquired")
self._addThread(target=_AcquireOne)
self.ls.release(names=name)
- self.assert_(not self.ls._list_owned())
+ self.assertFalse(self.ls._list_owned())
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._setUpLS()
+ def testPriority(self):
+ def _Acquire(prev, next, name, priority, success_fn):
+ prev.wait()
+ self.assert_(self.ls.acquire(name, shared=0,
+ priority=priority,
+ test_notify=lambda _: next.set()))
+ try:
+ success_fn()
+ finally:
+ self.ls.release()
+
+ # Get all in exclusive mode
+ self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
+
+ done_two = Queue.Queue(0)
+
+ first = threading.Event()
+ prev = first
+
+ acquires = [("one", prio, self.done) for prio in range(1, 33)]
+ acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
+
+ # Use a deterministic random generator
+ random.Random(741).shuffle(acquires)
+
+ for (name, prio, done) in acquires:
+ ev = threading.Event()
+ self._addThread(target=_Acquire,
+ args=(prev, ev, name, prio,
+ compat.partial(done.put, "Prio%s" % prio)))
+ prev = ev
+
+ # Start acquires
+ first.set()
+
+ # Wait for last acquire to start
+ prev.wait()
+
+ # Let threads acquire locks
+ self.ls.release()
+
+ # Wait for threads to finish
+ self._waitThreads()
+
+ for i in range(1, 33):
+ self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
+ self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
+
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.assertRaises(Queue.Empty, done_two.get_nowait)
+
class TestGanetiLockManager(_ThreadedTestCase):
def setUp(self):
_ThreadedTestCase.setUp(self)
self.nodes=['n1', 'n2']
+ self.nodegroups=['g1', 'g2']
self.instances=['i1', 'i2', 'i3']
- self.GL = locking.GanetiLockManager(nodes=self.nodes,
- instances=self.instances)
+ self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
+ self.instances)
def tearDown(self):
# Don't try this at home...
self.assertEqual(i, locking.LEVELS[i])
def testDoubleGLFails(self):
- self.assertRaises(AssertionError, locking.GanetiLockManager)
+ self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
def testLockNames(self):
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
+ set(self.nodegroups))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
def testInitAndResources(self):
locking.GanetiLockManager._instance = None
- self.GL = locking.GanetiLockManager()
+ self.GL = locking.GanetiLockManager([], [], [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
locking.GanetiLockManager._instance = None
- self.GL = locking.GanetiLockManager(nodes=self.nodes)
+ self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
+ set(self.nodegroups))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
locking.GanetiLockManager._instance = None
- self.GL = locking.GanetiLockManager(instances=self.instances)
+ self.GL = locking.GanetiLockManager([], [], self.instances)
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
+ self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
self.GL.release(locking.LEVEL_NODE, ['n2'])
self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
self.GL.release(locking.LEVEL_NODE)
self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+ self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.release(locking.LEVEL_INSTANCE)
self.assertRaises(errors.LockError, self.GL.acquire,
locking.LEVEL_INSTANCE, ['i5'])
set(self.instances))
self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
set(self.instances))
+ self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
+ set(self.nodegroups))
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
+ set(self.nodegroups))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
set(self.nodes))
self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
set(self.nodes))
self.GL.release(locking.LEVEL_NODE)
+ self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.release(locking.LEVEL_CLUSTER)
locking.LEVEL_NODE, ['n1', 'n2'])
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_INSTANCE, ['i3'])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_NODEGROUP, ['g1'])
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.GL.acquire(locking.LEVEL_NODE, ['n1'])
self.assertRaises(AssertionError, self.GL.release,
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER)
self.GL.release(locking.LEVEL_INSTANCE)
+ self.GL.acquire(locking.LEVEL_NODEGROUP, None)
+ self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER, ['BGL'])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER)
+ self.GL.release(locking.LEVEL_NODEGROUP)
+ self.GL.release(locking.LEVEL_CLUSTER)
def testWrongOrder(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_NODE, ['n1'])
self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_NODEGROUP, ['g1'])
+ self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_INSTANCE, ['i2'])
+ def testModifiableLevels(self):
+ self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
+ ['BGL2'])
+ self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
+ self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
+ self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
+ self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
+ self.GL.add(locking.LEVEL_NODE, ['n3'])
+ self.GL.remove(locking.LEVEL_NODE, ['n1'])
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
+ self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
+ self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
+ self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
+ self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
+ self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
+ ['BGL2'])
+
# Helper function to run as a thread that shared the BGL and then acquires
# some locks at another level.
def _doLock(self, level, names, shared):
self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
+class TestLockMonitor(_ThreadedTestCase):
+ def setUp(self):
+ _ThreadedTestCase.setUp(self)
+ self.lm = locking.LockMonitor()
+
+ def testSingleThread(self):
+ locks = []
+
+ for i in range(100):
+ name = "TestLock%s" % i
+ locks.append(locking.SharedLock(name, monitor=self.lm))
+
+ self.assertEqual(len(self.lm._locks), len(locks))
+ result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
+ self.assertEqual(len(result.fields), 1)
+ self.assertEqual(len(result.data), 100)
+
+ # Delete all locks
+ del locks[:]
+
+ # The garbage collector might needs some time
+ def _CheckLocks():
+ if self.lm._locks:
+ raise utils.RetryAgain()
+
+ utils.Retry(_CheckLocks, 0.1, 30.0)
+
+ self.assertFalse(self.lm._locks)
+
+ def testMultiThread(self):
+ locks = []
+
+ def _CreateLock(prev, next, name):
+ prev.wait()
+ locks.append(locking.SharedLock(name, monitor=self.lm))
+ if next:
+ next.set()
+
+ expnames = []
+
+ first = threading.Event()
+ prev = first
+
+ # Use a deterministic random generator
+ for i in random.Random(4263).sample(range(100), 33):
+ name = "MtTestLock%s" % i
+ expnames.append(name)
+
+ ev = threading.Event()
+ self._addThread(target=_CreateLock, args=(prev, ev, name))
+ prev = ev
+
+ # Add locks
+ first.set()
+ self._waitThreads()
+
+ # Check order in which locks were added
+ self.assertEqual([i.name for i in locks], expnames)
+
+ # Check query result
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assert_(isinstance(result, dict))
+ response = objects.QueryResponse.FromDict(result)
+ self.assertEqual(response.data,
+ [[(constants.QRFS_NORMAL, name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, [])]
+ for name in utils.NiceSort(expnames)])
+ self.assertEqual(len(response.fields), 4)
+ self.assertEqual(["name", "mode", "owner", "pending"],
+ [fdef.name for fdef in response.fields])
+
+ # Test exclusive acquire
+ for tlock in locks[::4]:
+ tlock.acquire(shared=0)
+ try:
+ def _GetExpResult(name):
+ if tlock.name == name:
+ return [(constants.QRFS_NORMAL, name),
+ (constants.QRFS_NORMAL, "exclusive"),
+ (constants.QRFS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.QRFS_NORMAL, [])]
+ return [(constants.QRFS_NORMAL, name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, [])]
+
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [_GetExpResult(name)
+ for name in utils.NiceSort(expnames)])
+ finally:
+ tlock.release()
+
+ # Test shared acquire
+ def _Acquire(lock, shared, ev, notify):
+ lock.acquire(shared=shared)
+ try:
+ notify.set()
+ ev.wait()
+ finally:
+ lock.release()
+
+ for tlock1 in locks[::11]:
+ for tlock2 in locks[::-15]:
+ if tlock2 == tlock1:
+ # Avoid deadlocks
+ continue
+
+ for tlock3 in locks[::10]:
+ if tlock3 in (tlock2, tlock1):
+ # Avoid deadlocks
+ continue
+
+ releaseev = threading.Event()
+
+ # Acquire locks
+ acquireev = []
+ tthreads1 = []
+ for i in range(3):
+ ev = threading.Event()
+ tthreads1.append(self._addThread(target=_Acquire,
+ args=(tlock1, 1, releaseev, ev)))
+ acquireev.append(ev)
+
+ ev = threading.Event()
+ tthread2 = self._addThread(target=_Acquire,
+ args=(tlock2, 1, releaseev, ev))
+ acquireev.append(ev)
+
+ ev = threading.Event()
+ tthread3 = self._addThread(target=_Acquire,
+ args=(tlock3, 0, releaseev, ev))
+ acquireev.append(ev)
+
+ # Wait for all locks to be acquired
+ for i in acquireev:
+ i.wait()
+
+ # Check query result
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ response = objects.QueryResponse.FromDict(result)
+ for (name, mode, owner) in response.data:
+ (name_status, name_value) = name
+ (owner_status, owner_value) = owner
+
+ self.assertEqual(name_status, constants.QRFS_NORMAL)
+ self.assertEqual(owner_status, constants.QRFS_NORMAL)
+
+ if name_value == tlock1.name:
+ self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
+ self.assertEqual(set(owner_value),
+ set(i.getName() for i in tthreads1))
+ continue
+
+ if name_value == tlock2.name:
+ self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
+ self.assertEqual(owner_value, [tthread2.getName()])
+ continue
+
+ if name_value == tlock3.name:
+ self.assertEqual(mode, (constants.QRFS_NORMAL, "exclusive"))
+ self.assertEqual(owner_value, [tthread3.getName()])
+ continue
+
+ self.assert_(name_value in expnames)
+ self.assertEqual(mode, (constants.QRFS_NORMAL, None))
+ self.assert_(owner_value is None)
+
+ # Release locks again
+ releaseev.set()
+
+ self._waitThreads()
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None)]
+ for name in utils.NiceSort(expnames)])
+
+ def testDelete(self):
+ lock = locking.SharedLock("TestLock", monitor=self.lm)
+
+ self.assertEqual(len(self.lm._locks), 1)
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None)]])
+
+ lock.delete()
+
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, "deleted"),
+ (constants.QRFS_NORMAL, None)]])
+ self.assertEqual(len(self.lm._locks), 1)
+
+ def testPending(self):
+ def _Acquire(lock, shared, prev, next):
+ prev.wait()
+
+ lock.acquire(shared=shared, test_notify=next.set)
+ try:
+ pass
+ finally:
+ lock.release()
+
+ lock = locking.SharedLock("ExcLock", monitor=self.lm)
+
+ for shared in [0, 1]:
+ lock.acquire()
+ try:
+ self.assertEqual(len(self.lm._locks), 1)
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, "exclusive"),
+ (constants.QRFS_NORMAL,
+ [threading.currentThread().getName()])]])
+
+ threads = []
+
+ first = threading.Event()
+ prev = first
+
+ for i in range(5):
+ ev = threading.Event()
+ threads.append(self._addThread(target=_Acquire,
+ args=(lock, shared, prev, ev)))
+ prev = ev
+
+ # Start acquires
+ first.set()
+
+ # Wait for last acquire to start waiting
+ prev.wait()
+
+ # NOTE: This works only because QueryLocks will acquire the
+ # lock-internal lock again and won't be able to get the information
+ # until it has the lock. By then the acquire should be registered in
+ # SharedLock.__pending (otherwise it's a bug).
+
+ # All acquires are waiting now
+ if shared:
+ pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
+ else:
+ pending = [("exclusive", [t.getName()]) for t in threads]
+
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, "exclusive"),
+ (constants.QRFS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.QRFS_NORMAL, pending)]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+ finally:
+ lock.release()
+
+ self._waitThreads()
+
+ # No pending acquires
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, [])]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+
+
if __name__ == '__main__':
testutils.GanetiTestProgram()