from ganeti import utils
+def ssynchronized(lock, shared=0):
+ """Shared Synchronization decorator.
+
+ Calls the function holding the given lock, either in exclusive or shared
+ mode. It requires the passed lock to be a SharedLock (or support its
+ semantics).
+
+ """
+ def wrap(fn):
+ def sync_function(*args, **kwargs):
+ lock.acquire(shared=shared)
+ try:
+ return fn(*args, **kwargs)
+ finally:
+ lock.release()
+ return sync_function
+ return wrap
+
+
class SharedLock:
"""Implements a shared lock.
from threading import Thread
+# This is used to test the ssynchronize decorator.
+# Since it's passed as input to a decorator it must be declared as a global.
+_decoratorlock = locking.SharedLock()
+
+
class TestSharedLock(unittest.TestCase):
"""SharedLock tests"""
self.assertEqual(self.done.get(True, 1), 'ERR')
+class TestSSynchronizedDecorator(unittest.TestCase):
+ """Shared Lock Synchronized decorator test"""
+
+ def setUp(self):
+ # helper threads use the 'done' queue to tell the master they finished.
+ self.done = Queue.Queue(0)
+
+ @locking.ssynchronized(_decoratorlock)
+ def _doItExclusive(self):
+ self.assert_(_decoratorlock._is_owned())
+ self.done.put('EXC')
+
+ @locking.ssynchronized(_decoratorlock, shared=1)
+ def _doItSharer(self):
+ self.assert_(_decoratorlock._is_owned(shared=1))
+ self.done.put('SHR')
+
+ def testDecoratedFunctions(self):
+ self._doItExclusive()
+ self.assert_(not _decoratorlock._is_owned())
+ self._doItSharer()
+ self.assert_(not _decoratorlock._is_owned())
+
+ def testSharersCanCoexist(self):
+ _decoratorlock.acquire(shared=1)
+ Thread(target=self._doItSharer).start()
+ self.assert_(self.done.get(True, 1))
+ _decoratorlock.release()
+
+ def testExclusiveBlocksExclusive(self):
+ _decoratorlock.acquire()
+ Thread(target=self._doItExclusive).start()
+ # give it a bit of time to check that it's not actually doing anything
+ self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+ _decoratorlock.release()
+ self.assert_(self.done.get(True, 1))
+
+ def testExclusiveBlocksSharer(self):
+ _decoratorlock.acquire()
+ Thread(target=self._doItSharer).start()
+ time.sleep(0.05)
+ self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+ _decoratorlock.release()
+ self.assert_(self.done.get(True, 1))
+
+ def testSharerBlocksExclusive(self):
+ _decoratorlock.acquire(shared=1)
+ Thread(target=self._doItExclusive).start()
+ time.sleep(0.05)
+ self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+ _decoratorlock.release()
+ self.assert_(self.done.get(True, 1))
+
+
class TestLockSet(unittest.TestCase):
"""LockSet tests"""