# pylint: disable-msg=W0613,W0201
import threading
+# Wouldn't it be better to define LockingError in the locking module?
+# Well, for now that's how the rest of the code does it...
+from ganeti import errors
class SharedLock:
self.__nwait_exc = 0
self.__nwait_shr = 0
+ # is this lock in the deleted state?
+ self.__deleted = False
+
def __is_sharer(self):
"""Is the current thread sharing the lock at this time?"""
return threading.currentThread() in self.__shr
return result
+ def __wait(self,c):
+ """Wait on the given condition, and raise an exception if the current lock
+ is declared deleted in the meantime.
+
+ Args:
+ c: condition to wait on
+
+ """
+ c.wait()
+ if self.__deleted:
+ raise errors.LockError('deleted lock')
+
+ def __exclusive_acquire(self):
+ """Acquire the lock exclusively.
+
+ This is a private function that presumes you are already holding the
+ internal lock. It's defined separately to avoid code duplication between
+ acquire() and delete()
+
+ """
+ self.__nwait_exc += 1
+ try:
+ # This is to save ourselves from a nasty race condition that could
+ # theoretically make the sharers starve.
+ if self.__nwait_shr > 0 or self.__nwait_exc > 1:
+ self.__wait(self.__turn_exc)
+
+ while len(self.__shr) > 0 or self.__exc is not None:
+ self.__wait(self.__turn_exc)
+
+ self.__exc = threading.currentThread()
+ finally:
+ self.__nwait_exc -= 1
+
+
def acquire(self, blocking=1, shared=0):
"""Acquire a shared lock.
self.__lock.acquire()
try:
+ if self.__deleted:
+ raise errors.LockError('deleted lock')
+
# We cannot acquire the lock if we already have it
assert not self.__is_owned(), "double acquire() on a non-recursive lock"
# we'll just wait while there are no exclusive holders.
if self.__nwait_exc > 0:
# TODO: if !blocking...
- self.__turn_shr.wait()
+ self.__wait(self.__turn_shr)
while self.__exc is not None:
# TODO: if !blocking...
- self.__turn_shr.wait()
+ self.__wait(self.__turn_shr)
self.__shr.add(threading.currentThread())
finally:
self.__nwait_shr -= 1
else:
- self.__nwait_exc += 1
- try:
- # This is to save ourselves from a nasty race condition that could
- # theoretically make the sharers starve.
- if self.__nwait_shr > 0 or self.__nwait_exc > 1:
- # TODO: if !blocking...
- self.__turn_exc.wait()
-
- while len(self.__shr) > 0 or self.__exc is not None:
- # TODO: if !blocking...
- self.__turn_exc.wait()
-
- self.__exc = threading.currentThread()
- finally:
- self.__nwait_exc -= 1
+ # TODO: if !blocking...
+ # (or modify __exclusive_acquire for non-blocking mode)
+ self.__exclusive_acquire()
finally:
self.__lock.release()
finally:
self.__lock.release()
+ def delete(self, blocking=1):
+ """Delete a Shared Lock.
+
+ This operation will declare the lock for removal. First the lock will be
+ acquired in exclusive mode if you don't already own it, then the lock
+ will be put in a state where any future and pending acquire() fail.
+
+ Args:
+ blocking: whether to block while trying to acquire or to operate in
+ try-lock mode. this locking mode is not supported yet unless
+ you are already holding exclusively the lock.
+
+ """
+ self.__lock.acquire()
+ try:
+ assert not self.__is_sharer(), "cannot delete() a lock while sharing it"
+
+ if self.__deleted:
+ raise errors.LockError('deleted lock')
+
+ if not self.__is_exclusive():
+ if not blocking:
+ # We don't have non-blocking mode for now
+ raise NotImplementedError
+ self.__exclusive_acquire()
+
+ self.__deleted = True
+ self.__exc = None
+ # Wake up everybody, they will fail acquiring the lock and
+ # raise an exception instead.
+ self.__turn_exc.notifyAll()
+ self.__turn_shr.notifyAll()
+
+ finally:
+ self.__lock.release()
+
import Queue
from ganeti import locking
+from ganeti import errors
from threading import Thread
# helper functions: called in a separate thread they acquire the lock, send
# their identifier on the done queue, then release it.
def _doItSharer(self):
- self.sl.acquire(shared=1)
- self.done.put('SHR')
- self.sl.release()
+ try:
+ self.sl.acquire(shared=1)
+ self.done.put('SHR')
+ self.sl.release()
+ except errors.LockError:
+ self.done.put('ERR')
def _doItExclusive(self):
- self.sl.acquire()
- self.done.put('EXC')
- self.sl.release()
+ try:
+ self.sl.acquire()
+ self.done.put('EXC')
+ self.sl.release()
+ except errors.LockError:
+ self.done.put('ERR')
+
+ def _doItDelete(self):
+ try:
+ self.sl.acquire()
+ self.done.put('DEL')
+ self.sl.release()
+ except errors.LockError:
+ self.done.put('ERR')
def testSharersCanCoexist(self):
self.sl.acquire(shared=1)
self.sl.release()
self.assert_(self.done.get(True, 1))
+ def testExclusiveBlocksDelete(self):
+ self.sl.acquire()
+ Thread(target=self._doItDelete).start()
+ # give it a bit of time to check that it's not actually doing anything
+ self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+ self.sl.release()
+ self.assert_(self.done.get(True, 1))
+
def testExclusiveBlocksSharer(self):
self.sl.acquire()
Thread(target=self._doItSharer).start()
self.sl.release()
self.assert_(self.done.get(True, 1))
+ def testSharerBlocksDelete(self):
+ self.sl.acquire(shared=1)
+ Thread(target=self._doItDelete).start()
+ time.sleep(0.05)
+ self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+ self.sl.release()
+ self.assert_(self.done.get(True, 1))
+
def testWaitingExclusiveBlocksSharer(self):
self.sl.acquire(shared=1)
# the lock is acquired in shared mode...
self.assertEqual(self.done.get(True, 1), 'SHR')
self.assertEqual(self.done.get(True, 1), 'EXC')
+ def testNoNonBlocking(self):
+ self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
+ self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
+ self.sl.acquire()
+ self.sl.delete(blocking=0) # Fine, because the lock is already acquired
+
+ def testDelete(self):
+ self.sl.delete()
+ self.assertRaises(errors.LockError, self.sl.acquire)
+ self.assertRaises(errors.LockError, self.sl.delete)
+
+ def testDeletePendingSharersExclusiveDelete(self):
+ self.sl.acquire()
+ Thread(target=self._doItSharer).start()
+ Thread(target=self._doItSharer).start()
+ time.sleep(0.05)
+ Thread(target=self._doItExclusive).start()
+ Thread(target=self._doItDelete).start()
+ time.sleep(0.05)
+ self.sl.delete()
+ # The two threads who were pending return both ERR
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+
+ def testDeletePendingDeleteExclusiveSharers(self):
+ self.sl.acquire()
+ Thread(target=self._doItDelete).start()
+ Thread(target=self._doItExclusive).start()
+ time.sleep(0.05)
+ Thread(target=self._doItSharer).start()
+ Thread(target=self._doItSharer).start()
+ time.sleep(0.05)
+ self.sl.delete()
+ # The two threads who were pending return both ERR
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+
if __name__ == '__main__':
unittest.main()