Revision 8d7d8b57
b/lib/locking.py | ||
---|---|---|
33 | 33 |
import logging |
34 | 34 |
import heapq |
35 | 35 |
import itertools |
36 |
import time |
|
36 | 37 |
|
37 | 38 |
from ganeti import errors |
38 | 39 |
from ganeti import utils |
... | ... | |
404 | 405 |
"__pending_by_prio", |
405 | 406 |
"__pending_shared", |
406 | 407 |
"__shr", |
408 |
"__time_fn", |
|
407 | 409 |
"name", |
408 | 410 |
] |
409 | 411 |
|
410 | 412 |
__condition_class = _PipeConditionWithMode |
411 | 413 |
|
412 |
def __init__(self, name, monitor=None): |
|
414 |
def __init__(self, name, monitor=None, _time_fn=time.time):
|
|
413 | 415 |
"""Construct a new SharedLock. |
414 | 416 |
|
415 | 417 |
@param name: the name of the lock |
... | ... | |
421 | 423 |
|
422 | 424 |
self.name = name |
423 | 425 |
|
426 |
# Used for unittesting |
|
427 |
self.__time_fn = _time_fn |
|
428 |
|
|
424 | 429 |
# Internal lock |
425 | 430 |
self.__lock = threading.Lock() |
426 | 431 |
|
... | ... | |
682 | 687 |
assert priority not in self.__pending_shared |
683 | 688 |
self.__pending_shared[priority] = wait_condition |
684 | 689 |
|
690 |
wait_start = self.__time_fn() |
|
691 |
acquired = False |
|
692 |
|
|
685 | 693 |
try: |
686 | 694 |
# Wait until we become the topmost acquire in the queue or the timeout |
687 | 695 |
# expires. |
688 |
# TODO: Decrease timeout with spurious notifications |
|
689 |
while not (self.__is_on_top(wait_condition) and |
|
690 |
self.__can_acquire(shared)): |
|
691 |
# Wait for notification |
|
692 |
wait_condition.wait(timeout) |
|
693 |
self.__check_deleted() |
|
696 |
while True: |
|
697 |
if self.__is_on_top(wait_condition) and self.__can_acquire(shared): |
|
698 |
self.__do_acquire(shared) |
|
699 |
acquired = True |
|
700 |
break |
|
694 | 701 |
|
695 |
# A lot of code assumes blocking acquires always succeed. Loop |
|
696 |
# internally for that case. |
|
697 |
if timeout is not None: |
|
702 |
# A lot of code assumes blocking acquires always succeed, therefore we |
|
703 |
# can never return False for a blocking acquire |
|
704 |
if (timeout is not None and |
|
705 |
utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)): |
|
698 | 706 |
break |
699 | 707 |
|
700 |
if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
|
|
701 |
self.__do_acquire(shared)
|
|
702 |
return True
|
|
708 |
# Wait for notification
|
|
709 |
wait_condition.wait(timeout)
|
|
710 |
self.__check_deleted()
|
|
703 | 711 |
finally: |
704 | 712 |
# Remove condition from queue if there are no more waiters |
705 | 713 |
if not wait_condition.has_waiting(): |
... | ... | |
709 | 717 |
# (e.g. on lock deletion) |
710 | 718 |
self.__pending_shared.pop(priority, None) |
711 | 719 |
|
712 |
return False
|
|
720 |
return acquired
|
|
713 | 721 |
|
714 | 722 |
def acquire(self, shared=0, timeout=None, priority=None, |
715 | 723 |
test_notify=None): |
... | ... | |
800 | 808 |
self.__shr.remove(threading.currentThread()) |
801 | 809 |
|
802 | 810 |
# Notify topmost condition in queue |
803 |
(priority, prioqueue) = self.__find_first_pending_queue() |
|
804 |
if prioqueue: |
|
805 |
cond = prioqueue[0] |
|
806 |
cond.notifyAll() |
|
807 |
if cond.shared: |
|
808 |
# Prevent further shared acquires from sneaking in while waiters are |
|
809 |
# notified |
|
810 |
self.__pending_shared.pop(priority, None) |
|
811 |
self.__notify_topmost() |
|
812 |
finally: |
|
813 |
self.__lock.release() |
|
814 |
|
|
815 |
def __notify_topmost(self): |
|
816 |
"""Notifies topmost condition in queue of pending acquires. |
|
817 |
|
|
818 |
""" |
|
819 |
(priority, prioqueue) = self.__find_first_pending_queue() |
|
820 |
if prioqueue: |
|
821 |
cond = prioqueue[0] |
|
822 |
cond.notifyAll() |
|
823 |
if cond.shared: |
|
824 |
# Prevent further shared acquires from sneaking in while waiters are |
|
825 |
# notified |
|
826 |
self.__pending_shared.pop(priority, None) |
|
827 |
|
|
828 |
def _notify_topmost(self): |
|
829 |
"""Exported version of L{__notify_topmost}. |
|
811 | 830 |
|
831 |
""" |
|
832 |
self.__lock.acquire() |
|
833 |
try: |
|
834 |
return self.__notify_topmost() |
|
812 | 835 |
finally: |
813 | 836 |
self.__lock.release() |
814 | 837 |
|
b/test/ganeti.locking_unittest.py | ||
---|---|---|
953 | 953 |
for i in sorted(perprio.keys())] |
954 | 954 |
for (shared, _, threads) in acquires]) |
955 | 955 |
|
956 |
class _FakeTimeForSpuriousNotifications: |
|
957 |
def __init__(self, now, check_end): |
|
958 |
self.now = now |
|
959 |
self.check_end = check_end |
|
960 |
|
|
961 |
# Deterministic random number generator |
|
962 |
self.rnd = random.Random(15086) |
|
963 |
|
|
964 |
def time(self): |
|
965 |
# Advance time if the random number generator thinks so (this is to test |
|
966 |
# multiple notifications without advancing the time) |
|
967 |
if self.rnd.random() < 0.3: |
|
968 |
self.now += self.rnd.random() |
|
969 |
|
|
970 |
self.check_end(self.now) |
|
971 |
|
|
972 |
return self.now |
|
973 |
|
|
974 |
@_Repeat |
|
975 |
def testAcquireTimeoutWithSpuriousNotifications(self): |
|
976 |
ready = threading.Event() |
|
977 |
locked = threading.Event() |
|
978 |
req = Queue.Queue(0) |
|
979 |
|
|
980 |
epoch = 4000.0 |
|
981 |
timeout = 60.0 |
|
982 |
|
|
983 |
def check_end(now): |
|
984 |
self.assertFalse(locked.isSet()) |
|
985 |
|
|
986 |
# If we waited long enough (in virtual time), tell main thread to release |
|
987 |
# lock, otherwise tell it to notify once more |
|
988 |
req.put(now < (epoch + (timeout * 0.8))) |
|
989 |
|
|
990 |
time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time |
|
991 |
|
|
992 |
sl = locking.SharedLock("test", _time_fn=time_fn) |
|
993 |
|
|
994 |
# Acquire in exclusive mode |
|
995 |
sl.acquire(shared=0) |
|
996 |
|
|
997 |
def fn(): |
|
998 |
self.assertTrue(sl.acquire(shared=0, timeout=timeout, |
|
999 |
test_notify=ready.set)) |
|
1000 |
locked.set() |
|
1001 |
sl.release() |
|
1002 |
self.done.put("success") |
|
1003 |
|
|
1004 |
# Start acquire with timeout and wait for it to be ready |
|
1005 |
self._addThread(target=fn) |
|
1006 |
ready.wait() |
|
1007 |
|
|
1008 |
# The separate thread is now waiting to acquire the lock, so start sending |
|
1009 |
# spurious notifications. |
|
1010 |
|
|
1011 |
# Wait for separate thread to ask for another notification |
|
1012 |
count = 0 |
|
1013 |
while req.get(): |
|
1014 |
# After sending the notification, the lock will take a short amount of |
|
1015 |
# time to notice and to retrieve the current time |
|
1016 |
sl._notify_topmost() |
|
1017 |
count += 1 |
|
1018 |
|
|
1019 |
self.assertTrue(count > 100, "Not enough notifications were sent") |
|
1020 |
|
|
1021 |
self.assertFalse(locked.isSet()) |
|
1022 |
|
|
1023 |
# Some notifications have been sent, now actually release the lock |
|
1024 |
sl.release() |
|
1025 |
|
|
1026 |
# Wait for lock to be acquired |
|
1027 |
locked.wait() |
|
1028 |
|
|
1029 |
self._waitThreads() |
|
1030 |
|
|
1031 |
self.assertEqual(self.done.get_nowait(), "success") |
|
1032 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
1033 |
|
|
956 | 1034 |
|
957 | 1035 |
class TestSharedLockInCondition(_ThreadedTestCase): |
958 | 1036 |
"""SharedLock as a condition lock tests""" |
Also available in: Unified diff