return False
- def acquire(self, shared=0, timeout=None, priority=_DEFAULT_PRIORITY,
+ def acquire(self, shared=0, timeout=None, priority=None,
test_notify=None):
"""Acquire a shared lock.
@param test_notify: Special callback function for unittesting
"""
+ if priority is None:
+ priority = _DEFAULT_PRIORITY
+
self.__lock.acquire()
try:
# We already got the lock, notify now
finally:
self.__lock.release()
- def delete(self, timeout=None, priority=_DEFAULT_PRIORITY):
+ def delete(self, timeout=None, priority=None):
"""Delete a Shared Lock.
This operation will declare the lock for removal. First the lock will be
@param priority: Priority for acquiring lock
"""
+ if priority is None:
+ priority = _DEFAULT_PRIORITY
+
self.__lock.acquire()
try:
assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
self.__lock.release()
return set(result)
- def acquire(self, names, timeout=None, shared=0, priority=_DEFAULT_PRIORITY,
+ def acquire(self, names, timeout=None, shared=0, priority=None,
test_notify=None):
"""Acquire a set of resource locks.
assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
" (lockset %s)" % self.name)
+ if priority is None:
+ priority = _DEFAULT_PRIORITY
+
# We need to keep track of how long we spent waiting for a lock. The
# timeout passed to this function is over all lock acquires.
running_timeout = RunningTimeout(timeout, False)