import logging
import heapq
import itertools
+import time
from ganeti import errors
from ganeti import utils
"__pending_by_prio",
"__pending_shared",
"__shr",
+ "__time_fn",
"name",
]
__condition_class = _PipeConditionWithMode
- def __init__(self, name, monitor=None):
+ def __init__(self, name, monitor=None, _time_fn=time.time):
"""Construct a new SharedLock.
@param name: the name of the lock
self.name = name
+ # Used for unittesting
+ self.__time_fn = _time_fn
+
# Internal lock
self.__lock = threading.Lock()
assert priority not in self.__pending_shared
self.__pending_shared[priority] = wait_condition
+ wait_start = self.__time_fn()
+ acquired = False
+
try:
# Wait until we become the topmost acquire in the queue or the timeout
# expires.
- # TODO: Decrease timeout with spurious notifications
- while not (self.__is_on_top(wait_condition) and
- self.__can_acquire(shared)):
- # Wait for notification
- wait_condition.wait(timeout)
- self.__check_deleted()
+ while True:
+ if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
+ self.__do_acquire(shared)
+ acquired = True
+ break
- # A lot of code assumes blocking acquires always succeed. Loop
- # internally for that case.
- if timeout is not None:
+ # A lot of code assumes blocking acquires always succeed, therefore we
+ # can never return False for a blocking acquire
+ if (timeout is not None and
+ utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
break
- if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
- self.__do_acquire(shared)
- return True
+ # Wait for notification
+ wait_condition.wait(timeout)
+ self.__check_deleted()
finally:
# Remove condition from queue if there are no more waiters
if not wait_condition.has_waiting():
# (e.g. on lock deletion)
self.__pending_shared.pop(priority, None)
- return False
+ return acquired
def acquire(self, shared=0, timeout=None, priority=None,
test_notify=None):
# Autodetect release type
if self.__is_exclusive():
self.__exc = None
+ notify = True
else:
self.__shr.remove(threading.currentThread())
+ notify = not self.__shr
- # Notify topmost condition in queue
- (priority, prioqueue) = self.__find_first_pending_queue()
- if prioqueue:
- cond = prioqueue[0]
- cond.notifyAll()
- if cond.shared:
- # Prevent further shared acquires from sneaking in while waiters are
- # notified
- self.__pending_shared.pop(priority, None)
+ # Notify topmost condition in queue if there are no owners left (for
+ # shared locks)
+ if notify:
+ self.__notify_topmost()
+ finally:
+ self.__lock.release()
+
+ def __notify_topmost(self):
+ """Notifies topmost condition in queue of pending acquires.
+
+ """
+ (priority, prioqueue) = self.__find_first_pending_queue()
+ if prioqueue:
+ cond = prioqueue[0]
+ cond.notifyAll()
+ if cond.shared:
+ # Prevent further shared acquires from sneaking in while waiters are
+ # notified
+ self.__pending_shared.pop(priority, None)
+
+ def _notify_topmost(self):
+ """Exported version of L{__notify_topmost}.
+ """
+ self.__lock.acquire()
+ try:
+ return self.__notify_topmost()
finally:
self.__lock.release()
if not acquired:
acquired = self.__acquire_unlocked(0, timeout, priority)
+ if acquired:
assert self.__is_exclusive() and not self.__is_sharer(), \
"Lock wasn't acquired in exclusive mode"
- if acquired:
self.__deleted = True
self.__exc = None