finally:
self.__lock.release()
+
+class LockSet:
+ """Implements a set of locks.
+
+ This abstraction implements a set of shared locks for the same resource type,
+ distinguished by name. The user can lock a subset of the resources and the
+ LockSet will take care of acquiring the locks always in the same order, thus
+ preventing deadlock.
+
+ All the locks needed in the same set must be acquired together, though.
+
+ """
+ def __init__(self, members=None):
+ """Constructs a new LockSet.
+
+ Args:
+ members: initial members of the set
+
+ """
+ # Used internally to guarantee coherency.
+ self.__lock = SharedLock()
+
+ # The lockdict indexes the relationship name -> lock
+ # The order-of-locking is implied by the alphabetical order of names
+ self.__lockdict = {}
+
+ if members is not None:
+ for name in members:
+ self.__lockdict[name] = SharedLock()
+
+ # The owner dict contains the set of locks each thread owns. For
+ # performance each thread can access its own key without a global lock on
+ # this structure. It is paramount though that *no* other type of access is
+ # done to this structure (eg. no looping over its keys). *_owner helper
+ # function are defined to guarantee access is correct, but in general never
+ # do anything different than __owners[threading.currentThread()], or there
+ # will be trouble.
+ self.__owners = {}
+
+ def _is_owned(self):
+ """Is the current thread a current level owner?"""
+ return threading.currentThread() in self.__owners
+
+ def _add_owned(self, name):
+ """Note the current thread owns the given lock"""
+ if self._is_owned():
+ self.__owners[threading.currentThread()].add(name)
+ else:
+ self.__owners[threading.currentThread()] = set([name])
+
+ def _del_owned(self, name):
+ """Note the current thread owns the given lock"""
+ self.__owners[threading.currentThread()].remove(name)
+
+ if not self.__owners[threading.currentThread()]:
+ del self.__owners[threading.currentThread()]
+
+ def _list_owned(self):
+ """Get the set of resource names owned by the current thread"""
+ if self._is_owned():
+ return self.__owners[threading.currentThread()].copy()
+ else:
+ return set()
+
+ def __names(self):
+ """Return the current set of names.
+
+ Only call this function while holding __lock and don't iterate on the
+ result after releasing the lock.
+
+ """
+ return set(self.__lockdict.keys())
+
+ def _names(self):
+ """Return a copy of the current set of elements.
+
+ Used only for debugging purposes.
+ """
+ self.__lock.acquire(shared=1)
+ try:
+ result = self.__names()
+ finally:
+ self.__lock.release()
+ return result
+
+ def acquire(self, names, blocking=1, shared=0):
+ """Acquire a set of resource locks.
+
+ Args:
+ names: the names of the locks which shall be acquired.
+ (special lock names, or instance/node names)
+ shared: whether to acquire in shared mode. By default an exclusive lock
+ will be acquired.
+ blocking: whether to block while trying to acquire or to operate in try-lock mode.
+ this locking mode is not supported yet.
+
+ Returns:
+ True: when all the locks are successfully acquired
+
+ Raises:
+ errors.LockError: when any lock we try to acquire has been deleted
+ before we succeed. In this case none of the locks requested will be
+ acquired.
+
+ """
+ if not blocking:
+ # We don't have non-blocking mode for now
+ raise NotImplementedError
+
+ # Check we don't already own locks at this level
+ assert not self._is_owned(), "Cannot acquire locks in the same set twice"
+
+ # Support passing in a single resource to acquire rather than many
+ if isinstance(names, basestring):
+ names = [names]
+ else:
+ names.sort()
+
+ # Now names contains a sorted list of resources whose lock we want to
+ # acquire. In order to get them we loop on this (private) list and look
+ # them up in __lockdict. Since we have no lock held on lockdict we have no
+ # guarantees on their presence, and they may even disappear after we looked
+ # them up. This is fine though as .acquire() itself is safe and will alert
+ # us if the lock gets deleted.
+
+ try:
+ for lname in names:
+ lock = self.__lockdict[lname] # raises KeyError if the lock is not there
+ lock.acquire(shared=shared) # raises LockError if the lock is deleted
+ try:
+ # now the lock cannot be deleted, we have it!
+ self._add_owned(lname)
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong, after this.
+ lock.release()
+ raise
+
+ except (KeyError, errors.LockError):
+ name_fail = lname
+ for lname in self._list_owned():
+ self.__lockdict[lname].release()
+ self._del_owned(lname)
+ raise errors.LockError('non-existing lock in set (%s)' % name_fail)
+
+ return True
+
+ def release(self, names=None):
+ """Release a set of resource locks, at the same level.
+
+ You must have acquired the locks, either in shared or in exclusive mode,
+ before releasing them.
+
+ Args:
+ names: the names of the locks which shall be released.
+ (defaults to all the locks acquired at that level).
+
+ """
+
+ assert self._is_owned(), "release() on lock set while not owner"
+
+ # Support passing in a single resource to release rather than many
+ if isinstance(names, basestring):
+ names = [names]
+
+ if names is None:
+ names = self._list_owned()
+ else:
+ names = set(names)
+ assert self._list_owned().issuperset(names), (
+ "release() on unheld resources %s" %
+ names.difference(self._list_owned()))
+
+ for lockname in names:
+ # If we are sure the lock doesn't leave __lockdict without being
+ # exclusively held we can do this...
+ self.__lockdict[lockname].release()
+ self._del_owned(lockname)
+
+ def add(self, names, acquired=0, shared=0):
+ """Add a new set of elements to the set
+
+ Args:
+ names: names of the new elements to add
+ acquired: pre-acquire the new resource?
+ shared: is the pre-acquisition shared?
+
+ """
+ # Support passing in a single resource to add rather than many
+ if isinstance(names, basestring):
+ names = [names]
+
+ # Acquire the internal lock in an exclusive way, so there cannot be a
+ # conflicting add()
+ self.__lock.acquire()
+ try:
+ invalid_names = self.__names().intersection(names)
+ if invalid_names:
+ # This must be an explicit raise, not an assert, because assert is
+ # turned off when using optimization, and this can happen because of
+ # concurrency even if the user doesn't want it.
+ raise errors.LockError("duplicate add() (%s)" % invalid_names)
+
+ for lockname in names:
+ lock = SharedLock()
+
+ if acquired:
+ lock.acquire(shared=shared)
+ # now the lock cannot be deleted, we have it!
+ try:
+ self._add_owned(lockname)
+ except:
+ # We shouldn't have problems adding the lock to the owners list,
+ # but if we did we'll try to release this lock and re-raise
+ # exception. Of course something is going to be really wrong,
+ # after this. On the other hand the lock hasn't been added to the
+ # __lockdict yet so no other threads should be pending on it. This
+ # release is just a safety measure.
+ lock.release()
+ raise
+
+ self.__lockdict[lockname] = lock
+
+ finally:
+ self.__lock.release()
+
+ return True
+
+ def remove(self, names, blocking=1):
+ """Remove elements from the lock set.
+
+ You can either not hold anything in the lockset or already hold a superset
+ of the elements you want to delete, exclusively.
+
+ Args:
+ names: names of the resource to remove.
+ blocking: whether to block while trying to acquire or to operate in
+ try-lock mode. this locking mode is not supported yet unless
+ you are already holding exclusively the locks.
+
+ Returns:
+ A list of lock which we failed to delete. The list is always empty if we
+ were holding all the locks exclusively.
+
+ """
+ if not blocking and not self._is_owned():
+ # We don't have non-blocking mode for now
+ raise NotImplementedError
+
+ # Support passing in a single resource to remove rather than many
+ if isinstance(names, basestring):
+ names = [names]
+
+ # If we own any subset of this lock it must be a superset of what we want
+ # to delete. The ownership must also be exclusive, but that will be checked
+ # by the lock itself.
+ assert not self._is_owned() or self._list_owned().issuperset(names), (
+ "remove() on acquired lockset while not owning all elements")
+
+ delete_failed=[]
+
+ for lname in names:
+ # Calling delete() acquires the lock exclusively if we don't already own
+ # it, and causes all pending and subsequent lock acquires to fail. It's
+ # fine to call it out of order because delete() also implies release(),
+ # and the assertion above guarantees that if we either already hold
+ # everything we want to delete, or we hold none.
+ try:
+ self.__lockdict[lname].delete()
+ except (KeyError, errors.LockError):
+ delete_failed.append(lname)
+ # This cannot happen if we were already holding it, verify:
+ assert not self._is_owned(), "remove failed while holding lockset"
+ else:
+ # If no LockError was raised we are the ones who deleted the lock.
+ # This means we can safely remove it from lockdict, as any further or
+ # pending delete() or acquire() will fail (and nobody can have the lock
+ # since before our call to delete()).
+ #
+ # This is done in an else clause because if the exception was thrown
+ # it's the job of the one who actually deleted it.
+ del self.__lockdict[lname]
+ # And let's remove it from our private list if we owned it.
+ if self._is_owned():
+ self._del_owned(lname)
+
+ return delete_failed
+
self.assertEqual(self.done.get(True, 1), 'ERR')
+class TestLockSet(unittest.TestCase):
+ """LockSet tests"""
+
+ def setUp(self):
+ self.resources = ['one', 'two', 'three']
+ self.ls = locking.LockSet(self.resources)
+ # helper threads use the 'done' queue to tell the master they finished.
+ self.done = Queue.Queue(0)
+
+ def testResources(self):
+ self.assertEquals(self.ls._names(), set(self.resources))
+ newls = locking.LockSet()
+ self.assertEquals(newls._names(), set())
+
+ def testAcquireRelease(self):
+ self.ls.acquire('one')
+ self.assertEquals(self.ls._list_owned(), set(['one']))
+ self.ls.release()
+ self.assertEquals(self.ls._list_owned(), set())
+ self.ls.acquire(['one'])
+ self.assertEquals(self.ls._list_owned(), set(['one']))
+ self.ls.release()
+ self.assertEquals(self.ls._list_owned(), set())
+ self.ls.acquire(['one', 'two', 'three'])
+ self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
+ self.ls.release('one')
+ self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
+ self.ls.release(['three'])
+ self.assertEquals(self.ls._list_owned(), set(['two']))
+ self.ls.release()
+ self.assertEquals(self.ls._list_owned(), set())
+ self.ls.acquire(['one', 'three'])
+ self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
+ self.ls.release()
+ self.assertEquals(self.ls._list_owned(), set())
+
+ def testNoDoubleAcquire(self):
+ self.ls.acquire('one')
+ self.assertRaises(AssertionError, self.ls.acquire, 'one')
+ self.assertRaises(AssertionError, self.ls.acquire, ['two'])
+ self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
+ self.ls.release()
+ self.ls.acquire(['one', 'three'])
+ self.ls.release('one')
+ self.assertRaises(AssertionError, self.ls.acquire, ['two'])
+ self.ls.release('three')
+
+ def testNoWrongRelease(self):
+ self.assertRaises(AssertionError, self.ls.release)
+ self.ls.acquire('one')
+ self.assertRaises(AssertionError, self.ls.release, 'two')
+
+ def testAddRemove(self):
+ self.ls.add('four')
+ self.assertEquals(self.ls._list_owned(), set())
+ self.assert_('four' in self.ls._names())
+ self.ls.add(['five', 'six', 'seven'], acquired=1)
+ self.assert_('five' in self.ls._names())
+ self.assert_('six' in self.ls._names())
+ self.assert_('seven' in self.ls._names())
+ self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
+ self.ls.remove(['five', 'six'])
+ self.assert_('five' not in self.ls._names())
+ self.assert_('six' not in self.ls._names())
+ self.assertEquals(self.ls._list_owned(), set(['seven']))
+ self.ls.add('eight', acquired=1, shared=1)
+ self.assert_('eight' in self.ls._names())
+ self.assertEquals(self.ls._list_owned(), set(['seven', 'eight']))
+ self.ls.remove('seven')
+ self.assert_('seven' not in self.ls._names())
+ self.assertEquals(self.ls._list_owned(), set(['eight']))
+ self.ls.release()
+ self.ls.remove(['two'])
+ self.assert_('two' not in self.ls._names())
+ self.ls.acquire('three')
+ self.ls.remove(['three'])
+ self.assert_('three' not in self.ls._names())
+ self.assertEquals(self.ls.remove('three'), ['three'])
+ self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['three', 'six'])
+ self.assert_('one' not in self.ls._names())
+
+ def testRemoveNonBlocking(self):
+ self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
+ self.ls.acquire('one')
+ self.assertEquals(self.ls.remove('one', blocking=0), [])
+ self.ls.acquire(['two', 'three'])
+ self.assertEquals(self.ls.remove(['two', 'three'], blocking=0), [])
+
+ def testNoDoubleAdd(self):
+ self.assertRaises(errors.LockError, self.ls.add, 'two')
+ self.ls.add('four')
+ self.assertRaises(errors.LockError, self.ls.add, 'four')
+
+ def testNoWrongRemoves(self):
+ self.ls.acquire(['one', 'three'], shared=1)
+ # Cannot remove 'two' while holding something which is not a superset
+ self.assertRaises(AssertionError, self.ls.remove, 'two')
+ # Cannot remove 'three' as we are sharing it
+ self.assertRaises(AssertionError, self.ls.remove, 'three')
+
+ def _doLockSet(self, set, shared):
+ try:
+ self.ls.acquire(set, shared=shared)
+ self.done.put('DONE')
+ self.ls.release()
+ except errors.LockError:
+ self.done.put('ERR')
+
+ def _doRemoveSet(self, set):
+ self.done.put(self.ls.remove(set))
+
+ def testConcurrentSharedAcquire(self):
+ self.ls.acquire(['one', 'two'], shared=1)
+ Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start()
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ Thread(target=self._doLockSet, args=('three', 1)).start()
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
+ Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
+ self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+ self.ls.release()
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+
+ def testConcurrentExclusiveAcquire(self):
+ self.ls.acquire(['one', 'two'])
+ Thread(target=self._doLockSet, args=('three', 1)).start()
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ Thread(target=self._doLockSet, args=('three', 0)).start()
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
+ Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
+ Thread(target=self._doLockSet, args=('one', 0)).start()
+ Thread(target=self._doLockSet, args=('one', 1)).start()
+ Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
+ Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start()
+ self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+ self.ls.release()
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+
+ def testConcurrentRemove(self):
+ self.ls.add('four')
+ self.ls.acquire(['one', 'two', 'four'])
+ Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start()
+ Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start()
+ Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
+ Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
+ self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+ self.ls.remove('one')
+ self.ls.release()
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.assertEqual(self.done.get(True, 1), 'ERR')
+ self.ls.add(['five', 'six'], acquired=1)
+ Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start()
+ Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start()
+ Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start()
+ Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start()
+ self.ls.remove('five')
+ self.ls.release()
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.ls.acquire(['three', 'four'])
+ Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start()
+ self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+ self.ls.remove('four')
+ self.assertEqual(self.done.get(True, 1), ['four'])
+ Thread(target=self._doRemoveSet, args=(['two'])).start()
+ self.assertEqual(self.done.get(True, 1), [])
+ self.ls.release()
+
+
if __name__ == '__main__':
unittest.main()
#suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)