import logging
import heapq
import itertools
+import time
from ganeti import errors
from ganeti import utils
"__pending_by_prio",
"__pending_shared",
"__shr",
+ "__time_fn",
"name",
]
__condition_class = _PipeConditionWithMode
- def __init__(self, name, monitor=None):
+ def __init__(self, name, monitor=None, _time_fn=time.time):
"""Construct a new SharedLock.
@param name: the name of the lock
self.name = name
+ # Used for unittesting
+ self.__time_fn = _time_fn
+
# Internal lock
self.__lock = threading.Lock()
assert priority not in self.__pending_shared
self.__pending_shared[priority] = wait_condition
+ wait_start = self.__time_fn()
+ acquired = False
+
try:
# Wait until we become the topmost acquire in the queue or the timeout
# expires.
- # TODO: Decrease timeout with spurious notifications
- while not (self.__is_on_top(wait_condition) and
- self.__can_acquire(shared)):
- # Wait for notification
- wait_condition.wait(timeout)
- self.__check_deleted()
+ while True:
+ if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
+ self.__do_acquire(shared)
+ acquired = True
+ break
- # A lot of code assumes blocking acquires always succeed. Loop
- # internally for that case.
- if timeout is not None:
+ # A lot of code assumes blocking acquires always succeed, therefore we
+ # can never return False for a blocking acquire
+ if (timeout is not None and
+ utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
break
- if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
- self.__do_acquire(shared)
- return True
+ # Wait for notification
+ wait_condition.wait(timeout)
+ self.__check_deleted()
finally:
# Remove condition from queue if there are no more waiters
if not wait_condition.has_waiting():
# (e.g. on lock deletion)
self.__pending_shared.pop(priority, None)
- return False
+ return acquired
def acquire(self, shared=0, timeout=None, priority=None,
test_notify=None):
self.__shr.remove(threading.currentThread())
# Notify topmost condition in queue
- (priority, prioqueue) = self.__find_first_pending_queue()
- if prioqueue:
- cond = prioqueue[0]
- cond.notifyAll()
- if cond.shared:
- # Prevent further shared acquires from sneaking in while waiters are
- # notified
- self.__pending_shared.pop(priority, None)
+ self.__notify_topmost()
+ finally:
+ self.__lock.release()
+
+ def __notify_topmost(self):
+ """Notifies topmost condition in queue of pending acquires.
+
+ """
+ (priority, prioqueue) = self.__find_first_pending_queue()
+ if prioqueue:
+ cond = prioqueue[0]
+ cond.notifyAll()
+ if cond.shared:
+ # Prevent further shared acquires from sneaking in while waiters are
+ # notified
+ self.__pending_shared.pop(priority, None)
+
+ def _notify_topmost(self):
+ """Exported version of L{__notify_topmost}.
+ """
+ self.__lock.acquire()
+ try:
+ return self.__notify_topmost()
finally:
self.__lock.release()
for i in sorted(perprio.keys())]
for (shared, _, threads) in acquires])
+ class _FakeTimeForSpuriousNotifications:
+ def __init__(self, now, check_end):
+ self.now = now
+ self.check_end = check_end
+
+ # Deterministic random number generator
+ self.rnd = random.Random(15086)
+
+ def time(self):
+ # Advance time if the random number generator thinks so (this is to test
+ # multiple notifications without advancing the time)
+ if self.rnd.random() < 0.3:
+ self.now += self.rnd.random()
+
+ self.check_end(self.now)
+
+ return self.now
+
+ @_Repeat
+ def testAcquireTimeoutWithSpuriousNotifications(self):
+ ready = threading.Event()
+ locked = threading.Event()
+ req = Queue.Queue(0)
+
+ epoch = 4000.0
+ timeout = 60.0
+
+ def check_end(now):
+ self.assertFalse(locked.isSet())
+
+ # If we waited long enough (in virtual time), tell main thread to release
+ # lock, otherwise tell it to notify once more
+ req.put(now < (epoch + (timeout * 0.8)))
+
+ time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
+
+ sl = locking.SharedLock("test", _time_fn=time_fn)
+
+ # Acquire in exclusive mode
+ sl.acquire(shared=0)
+
+ def fn():
+ self.assertTrue(sl.acquire(shared=0, timeout=timeout,
+ test_notify=ready.set))
+ locked.set()
+ sl.release()
+ self.done.put("success")
+
+ # Start acquire with timeout and wait for it to be ready
+ self._addThread(target=fn)
+ ready.wait()
+
+ # The separate thread is now waiting to acquire the lock, so start sending
+ # spurious notifications.
+
+ # Wait for separate thread to ask for another notification
+ count = 0
+ while req.get():
+ # After sending the notification, the lock will take a short amount of
+ # time to notice and to retrieve the current time
+ sl._notify_topmost()
+ count += 1
+
+ self.assertTrue(count > 100, "Not enough notifications were sent")
+
+ self.assertFalse(locked.isSet())
+
+ # Some notifications have been sent, now actually release the lock
+ sl.release()
+
+ # Wait for lock to be acquired
+ locked.wait()
+
+ self._waitThreads()
+
+ self.assertEqual(self.done.get_nowait(), "success")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
class TestSharedLockInCondition(_ThreadedTestCase):
"""SharedLock as a condition lock tests"""