Revision 8d7d8b57 lib/locking.py
b/lib/locking.py | ||
---|---|---|
33 | 33 |
import logging |
34 | 34 |
import heapq |
35 | 35 |
import itertools |
36 |
import time |
|
36 | 37 |
|
37 | 38 |
from ganeti import errors |
38 | 39 |
from ganeti import utils |
... | ... | |
404 | 405 |
"__pending_by_prio", |
405 | 406 |
"__pending_shared", |
406 | 407 |
"__shr", |
408 |
"__time_fn", |
|
407 | 409 |
"name", |
408 | 410 |
] |
409 | 411 |
|
410 | 412 |
__condition_class = _PipeConditionWithMode |
411 | 413 |
|
412 |
def __init__(self, name, monitor=None): |
|
414 |
def __init__(self, name, monitor=None, _time_fn=time.time):
|
|
413 | 415 |
"""Construct a new SharedLock. |
414 | 416 |
|
415 | 417 |
@param name: the name of the lock |
... | ... | |
421 | 423 |
|
422 | 424 |
self.name = name |
423 | 425 |
|
426 |
# Used for unittesting |
|
427 |
self.__time_fn = _time_fn |
|
428 |
|
|
424 | 429 |
# Internal lock |
425 | 430 |
self.__lock = threading.Lock() |
426 | 431 |
|
... | ... | |
682 | 687 |
assert priority not in self.__pending_shared |
683 | 688 |
self.__pending_shared[priority] = wait_condition |
684 | 689 |
|
690 |
wait_start = self.__time_fn() |
|
691 |
acquired = False |
|
692 |
|
|
685 | 693 |
try: |
686 | 694 |
# Wait until we become the topmost acquire in the queue or the timeout |
687 | 695 |
# expires. |
688 |
# TODO: Decrease timeout with spurious notifications |
|
689 |
while not (self.__is_on_top(wait_condition) and |
|
690 |
self.__can_acquire(shared)): |
|
691 |
# Wait for notification |
|
692 |
wait_condition.wait(timeout) |
|
693 |
self.__check_deleted() |
|
696 |
while True: |
|
697 |
if self.__is_on_top(wait_condition) and self.__can_acquire(shared): |
|
698 |
self.__do_acquire(shared) |
|
699 |
acquired = True |
|
700 |
break |
|
694 | 701 |
|
695 |
# A lot of code assumes blocking acquires always succeed. Loop |
|
696 |
# internally for that case. |
|
697 |
if timeout is not None: |
|
702 |
# A lot of code assumes blocking acquires always succeed, therefore we |
|
703 |
# can never return False for a blocking acquire |
|
704 |
if (timeout is not None and |
|
705 |
utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)): |
|
698 | 706 |
break |
699 | 707 |
|
700 |
if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
|
|
701 |
self.__do_acquire(shared)
|
|
702 |
return True
|
|
708 |
# Wait for notification
|
|
709 |
wait_condition.wait(timeout)
|
|
710 |
self.__check_deleted()
|
|
703 | 711 |
finally: |
704 | 712 |
# Remove condition from queue if there are no more waiters |
705 | 713 |
if not wait_condition.has_waiting(): |
... | ... | |
709 | 717 |
# (e.g. on lock deletion) |
710 | 718 |
self.__pending_shared.pop(priority, None) |
711 | 719 |
|
712 |
return False
|
|
720 |
return acquired
|
|
713 | 721 |
|
714 | 722 |
def acquire(self, shared=0, timeout=None, priority=None, |
715 | 723 |
test_notify=None): |
... | ... | |
800 | 808 |
self.__shr.remove(threading.currentThread()) |
801 | 809 |
|
802 | 810 |
# Notify topmost condition in queue |
803 |
(priority, prioqueue) = self.__find_first_pending_queue() |
|
804 |
if prioqueue: |
|
805 |
cond = prioqueue[0] |
|
806 |
cond.notifyAll() |
|
807 |
if cond.shared: |
|
808 |
# Prevent further shared acquires from sneaking in while waiters are |
|
809 |
# notified |
|
810 |
self.__pending_shared.pop(priority, None) |
|
811 |
self.__notify_topmost() |
|
812 |
finally: |
|
813 |
self.__lock.release() |
|
814 |
|
|
815 |
def __notify_topmost(self): |
|
816 |
"""Notifies topmost condition in queue of pending acquires. |
|
817 |
|
|
818 |
""" |
|
819 |
(priority, prioqueue) = self.__find_first_pending_queue() |
|
820 |
if prioqueue: |
|
821 |
cond = prioqueue[0] |
|
822 |
cond.notifyAll() |
|
823 |
if cond.shared: |
|
824 |
# Prevent further shared acquires from sneaking in while waiters are |
|
825 |
# notified |
|
826 |
self.__pending_shared.pop(priority, None) |
|
827 |
|
|
828 |
def _notify_topmost(self): |
|
829 |
"""Exported version of L{__notify_topmost}. |
|
811 | 830 |
|
831 |
""" |
|
832 |
self.__lock.acquire() |
|
833 |
try: |
|
834 |
return self.__notify_topmost() |
|
812 | 835 |
finally: |
813 | 836 |
self.__lock.release() |
814 | 837 |
|
Also available in: Unified diff