Revision 887c7aa6 lib/locking.py
b/lib/locking.py | ||
---|---|---|
32 | 32 |
import errno |
33 | 33 |
import weakref |
34 | 34 |
import logging |
35 |
import heapq |
|
35 | 36 |
|
36 | 37 |
from ganeti import errors |
37 | 38 |
from ganeti import utils |
... | ... | |
41 | 42 |
_EXCLUSIVE_TEXT = "exclusive" |
42 | 43 |
_SHARED_TEXT = "shared" |
43 | 44 |
|
45 |
_DEFAULT_PRIORITY = 0 |
|
46 |
|
|
44 | 47 |
|
45 | 48 |
def ssynchronized(mylock, shared=0): |
46 | 49 |
"""Shared Synchronization decorator. |
... | ... | |
406 | 409 |
return bool(self._waiters) |
407 | 410 |
|
408 | 411 |
|
412 |
class _PipeConditionWithMode(PipeCondition): |
|
413 |
__slots__ = [ |
|
414 |
"shared", |
|
415 |
] |
|
416 |
|
|
417 |
def __init__(self, lock, shared): |
|
418 |
"""Initializes this class. |
|
419 |
|
|
420 |
""" |
|
421 |
self.shared = shared |
|
422 |
PipeCondition.__init__(self, lock) |
|
423 |
|
|
424 |
|
|
409 | 425 |
class SharedLock(object): |
410 | 426 |
"""Implements a shared lock. |
411 | 427 |
|
... | ... | |
413 | 429 |
acquire_shared(). In order to acquire the lock in an exclusive way threads |
414 | 430 |
can call acquire_exclusive(). |
415 | 431 |
|
416 |
The lock prevents starvation but does not guarantee that threads will acquire |
|
417 |
the shared lock in the order they queued for it, just that they will |
|
418 |
eventually do so. |
|
432 |
Notes on data structures: C{__pending} contains a priority queue (heapq) of |
|
433 |
all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), |
|
434 |
...]}. Each per-priority queue contains a normal in-order list of conditions |
|
435 |
to be notified when the lock can be acquired. Shared locks are grouped |
|
436 |
together by priority and the condition for them is stored in |
|
437 |
C{__pending_shared} if it already exists. C{__pending_by_prio} keeps |
|
438 |
references for the per-priority queues indexed by priority for faster access. |
|
419 | 439 |
|
420 | 440 |
@type name: string |
421 | 441 |
@ivar name: the name of the lock |
... | ... | |
423 | 443 |
""" |
424 | 444 |
__slots__ = [ |
425 | 445 |
"__weakref__", |
426 |
"__active_shr_c", |
|
427 |
"__inactive_shr_c", |
|
428 | 446 |
"__deleted", |
429 | 447 |
"__exc", |
430 | 448 |
"__lock", |
431 | 449 |
"__pending", |
450 |
"__pending_by_prio", |
|
451 |
"__pending_shared", |
|
432 | 452 |
"__shr", |
433 | 453 |
"name", |
434 | 454 |
] |
435 | 455 |
|
436 |
__condition_class = PipeCondition
|
|
456 |
__condition_class = _PipeConditionWithMode
|
|
437 | 457 |
|
438 | 458 |
def __init__(self, name, monitor=None): |
439 | 459 |
"""Construct a new SharedLock. |
... | ... | |
452 | 472 |
|
453 | 473 |
# Queue containing waiting acquires |
454 | 474 |
self.__pending = [] |
455 |
|
|
456 |
# Active and inactive conditions for shared locks |
|
457 |
self.__active_shr_c = self.__condition_class(self.__lock) |
|
458 |
self.__inactive_shr_c = self.__condition_class(self.__lock) |
|
475 |
self.__pending_by_prio = {} |
|
476 |
self.__pending_shared = {} |
|
459 | 477 |
|
460 | 478 |
# Current lock holders |
461 | 479 |
self.__shr = set() |
... | ... | |
509 | 527 |
elif fname == "pending": |
510 | 528 |
data = [] |
511 | 529 |
|
512 |
for cond in self.__pending: |
|
513 |
if cond in (self.__active_shr_c, self.__inactive_shr_c): |
|
514 |
mode = _SHARED_TEXT |
|
515 |
else: |
|
516 |
mode = _EXCLUSIVE_TEXT |
|
530 |
# Sorting instead of copying and using heaq functions for simplicity |
|
531 |
for (_, prioqueue) in sorted(self.__pending): |
|
532 |
for cond in prioqueue: |
|
533 |
if cond.shared: |
|
534 |
mode = _SHARED_TEXT |
|
535 |
else: |
|
536 |
mode = _EXCLUSIVE_TEXT |
|
517 | 537 |
|
518 |
# This function should be fast as it runs with the lock held. Hence
|
|
519 |
# not using utils.NiceSort.
|
|
520 |
data.append((mode, sorted([i.getName()
|
|
521 |
for i in cond.get_waiting()])))
|
|
538 |
# This function should be fast as it runs with the lock held.
|
|
539 |
# Hence not using utils.NiceSort.
|
|
540 |
data.append((mode, sorted(i.getName()
|
|
541 |
for i in cond.get_waiting())))
|
|
522 | 542 |
|
523 | 543 |
info.append(data) |
524 | 544 |
else: |
... | ... | |
584 | 604 |
""" |
585 | 605 |
self.__lock.acquire() |
586 | 606 |
try: |
587 |
return len(self.__pending) |
|
607 |
return sum(len(prioqueue) for (_, prioqueue) in self.__pending) |
|
608 |
finally: |
|
609 |
self.__lock.release() |
|
610 |
|
|
611 |
def _check_empty(self): |
|
612 |
"""Checks whether there are any pending acquires. |
|
613 |
|
|
614 |
@rtype: bool |
|
615 |
|
|
616 |
""" |
|
617 |
self.__lock.acquire() |
|
618 |
try: |
|
619 |
# Order is important: __find_first_pending_queue modifies __pending |
|
620 |
return not (self.__find_first_pending_queue() or |
|
621 |
self.__pending or |
|
622 |
self.__pending_by_prio or |
|
623 |
self.__pending_shared) |
|
588 | 624 |
finally: |
589 | 625 |
self.__lock.release() |
590 | 626 |
|
... | ... | |
606 | 642 |
else: |
607 | 643 |
return len(self.__shr) == 0 and self.__exc is None |
608 | 644 |
|
645 |
def __find_first_pending_queue(self): |
|
646 |
"""Tries to find the topmost queued entry with pending acquires. |
|
647 |
|
|
648 |
Removes empty entries while going through the list. |
|
649 |
|
|
650 |
""" |
|
651 |
while self.__pending: |
|
652 |
(priority, prioqueue) = self.__pending[0] |
|
653 |
|
|
654 |
if not prioqueue: |
|
655 |
heapq.heappop(self.__pending) |
|
656 |
del self.__pending_by_prio[priority] |
|
657 |
assert priority not in self.__pending_shared |
|
658 |
continue |
|
659 |
|
|
660 |
if prioqueue: |
|
661 |
return prioqueue |
|
662 |
|
|
663 |
return None |
|
664 |
|
|
609 | 665 |
def __is_on_top(self, cond): |
610 | 666 |
"""Checks whether the passed condition is on top of the queue. |
611 | 667 |
|
612 | 668 |
The caller must make sure the queue isn't empty. |
613 | 669 |
|
614 | 670 |
""" |
615 |
return self.__pending[0] == cond
|
|
671 |
return cond == self.__find_first_pending_queue()[0]
|
|
616 | 672 |
|
617 |
def __acquire_unlocked(self, shared, timeout): |
|
673 |
def __acquire_unlocked(self, shared, timeout, priority):
|
|
618 | 674 |
"""Acquire a shared lock. |
619 | 675 |
|
620 | 676 |
@param shared: whether to acquire in shared mode; by default an |
621 | 677 |
exclusive lock will be acquired |
622 | 678 |
@param timeout: maximum waiting time before giving up |
679 |
@type priority: integer |
|
680 |
@param priority: Priority for acquiring lock |
|
623 | 681 |
|
624 | 682 |
""" |
625 | 683 |
self.__check_deleted() |
... | ... | |
628 | 686 |
assert not self.__is_owned(), ("double acquire() on a non-recursive lock" |
629 | 687 |
" %s" % self.name) |
630 | 688 |
|
689 |
# Remove empty entries from queue |
|
690 |
self.__find_first_pending_queue() |
|
691 |
|
|
631 | 692 |
# Check whether someone else holds the lock or there are pending acquires. |
632 | 693 |
if not self.__pending and self.__can_acquire(shared): |
633 | 694 |
# Apparently not, can acquire lock directly. |
634 | 695 |
self.__do_acquire(shared) |
635 | 696 |
return True |
636 | 697 |
|
637 |
if shared: |
|
638 |
wait_condition = self.__active_shr_c |
|
698 |
prioqueue = self.__pending_by_prio.get(priority, None) |
|
639 | 699 |
|
640 |
# Check if we're not yet in the queue |
|
641 |
if wait_condition not in self.__pending: |
|
642 |
self.__pending.append(wait_condition) |
|
700 |
if shared: |
|
701 |
# Try to re-use condition for shared acquire |
|
702 |
wait_condition = self.__pending_shared.get(priority, None) |
|
703 |
assert (wait_condition is None or |
|
704 |
(wait_condition.shared and wait_condition in prioqueue)) |
|
643 | 705 |
else: |
644 |
wait_condition = self.__condition_class(self.__lock) |
|
645 |
# Always add to queue |
|
646 |
self.__pending.append(wait_condition) |
|
706 |
wait_condition = None |
|
707 |
|
|
708 |
if wait_condition is None: |
|
709 |
if prioqueue is None: |
|
710 |
assert priority not in self.__pending_by_prio |
|
711 |
|
|
712 |
prioqueue = [] |
|
713 |
heapq.heappush(self.__pending, (priority, prioqueue)) |
|
714 |
self.__pending_by_prio[priority] = prioqueue |
|
715 |
|
|
716 |
wait_condition = self.__condition_class(self.__lock, shared) |
|
717 |
prioqueue.append(wait_condition) |
|
718 |
|
|
719 |
if shared: |
|
720 |
# Keep reference for further shared acquires on same priority. This is |
|
721 |
# better than trying to find it in the list of pending acquires. |
|
722 |
assert priority not in self.__pending_shared |
|
723 |
self.__pending_shared[priority] = wait_condition |
|
647 | 724 |
|
648 | 725 |
try: |
649 | 726 |
# Wait until we become the topmost acquire in the queue or the timeout |
650 | 727 |
# expires. |
728 |
# TODO: Decrease timeout with spurious notifications |
|
651 | 729 |
while not (self.__is_on_top(wait_condition) and |
652 | 730 |
self.__can_acquire(shared)): |
653 | 731 |
# Wait for notification |
... | ... | |
664 | 742 |
return True |
665 | 743 |
finally: |
666 | 744 |
# Remove condition from queue if there are no more waiters |
667 |
if not wait_condition.has_waiting() and not self.__deleted: |
|
668 |
self.__pending.remove(wait_condition) |
|
745 |
if not wait_condition.has_waiting(): |
|
746 |
prioqueue.remove(wait_condition) |
|
747 |
if wait_condition.shared: |
|
748 |
del self.__pending_shared[priority] |
|
669 | 749 |
|
670 | 750 |
return False |
671 | 751 |
|
672 |
def acquire(self, shared=0, timeout=None, test_notify=None): |
|
752 |
def acquire(self, shared=0, timeout=None, priority=_DEFAULT_PRIORITY, |
|
753 |
test_notify=None): |
|
673 | 754 |
"""Acquire a shared lock. |
674 | 755 |
|
675 | 756 |
@type shared: integer (0/1) used as a boolean |
... | ... | |
677 | 758 |
exclusive lock will be acquired |
678 | 759 |
@type timeout: float |
679 | 760 |
@param timeout: maximum waiting time before giving up |
761 |
@type priority: integer |
|
762 |
@param priority: Priority for acquiring lock |
|
680 | 763 |
@type test_notify: callable or None |
681 | 764 |
@param test_notify: Special callback function for unittesting |
682 | 765 |
|
... | ... | |
687 | 770 |
if __debug__ and callable(test_notify): |
688 | 771 |
test_notify() |
689 | 772 |
|
690 |
return self.__acquire_unlocked(shared, timeout) |
|
773 |
return self.__acquire_unlocked(shared, timeout, priority)
|
|
691 | 774 |
finally: |
692 | 775 |
self.__lock.release() |
693 | 776 |
|
... | ... | |
710 | 793 |
self.__shr.remove(threading.currentThread()) |
711 | 794 |
|
712 | 795 |
# Notify topmost condition in queue |
713 |
if self.__pending: |
|
714 |
first_condition = self.__pending[0] |
|
715 |
first_condition.notifyAll() |
|
716 |
|
|
717 |
if first_condition == self.__active_shr_c: |
|
718 |
self.__active_shr_c = self.__inactive_shr_c |
|
719 |
self.__inactive_shr_c = first_condition |
|
796 |
prioqueue = self.__find_first_pending_queue() |
|
797 |
if prioqueue: |
|
798 |
prioqueue[0].notifyAll() |
|
720 | 799 |
|
721 | 800 |
finally: |
722 | 801 |
self.__lock.release() |
723 | 802 |
|
724 |
def delete(self, timeout=None): |
|
803 |
def delete(self, timeout=None, priority=_DEFAULT_PRIORITY):
|
|
725 | 804 |
"""Delete a Shared Lock. |
726 | 805 |
|
727 | 806 |
This operation will declare the lock for removal. First the lock will be |
... | ... | |
730 | 809 |
|
731 | 810 |
@type timeout: float |
732 | 811 |
@param timeout: maximum waiting time before giving up |
812 |
@type priority: integer |
|
813 |
@param priority: Priority for acquiring lock |
|
733 | 814 |
|
734 | 815 |
""" |
735 | 816 |
self.__lock.acquire() |
... | ... | |
742 | 823 |
acquired = self.__is_exclusive() |
743 | 824 |
|
744 | 825 |
if not acquired: |
745 |
acquired = self.__acquire_unlocked(0, timeout) |
|
826 |
acquired = self.__acquire_unlocked(0, timeout, priority)
|
|
746 | 827 |
|
747 | 828 |
assert self.__is_exclusive() and not self.__is_sharer(), \ |
748 | 829 |
"Lock wasn't acquired in exclusive mode" |
... | ... | |
754 | 835 |
assert not (self.__exc or self.__shr), "Found owner during deletion" |
755 | 836 |
|
756 | 837 |
# Notify all acquires. They'll throw an error. |
757 |
while self.__pending: |
|
758 |
self.__pending.pop().notifyAll() |
|
838 |
for (_, prioqueue) in self.__pending: |
|
839 |
for cond in prioqueue: |
|
840 |
cond.notifyAll() |
|
841 |
|
|
842 |
assert self.__deleted |
|
759 | 843 |
|
760 | 844 |
return acquired |
761 | 845 |
finally: |
... | ... | |
908 | 992 |
self.__lock.release() |
909 | 993 |
return set(result) |
910 | 994 |
|
911 |
def acquire(self, names, timeout=None, shared=0, test_notify=None): |
|
995 |
def acquire(self, names, timeout=None, shared=0, priority=_DEFAULT_PRIORITY, |
|
996 |
test_notify=None): |
|
912 | 997 |
"""Acquire a set of resource locks. |
913 | 998 |
|
914 | 999 |
@type names: list of strings (or string) |
... | ... | |
919 | 1004 |
exclusive lock will be acquired |
920 | 1005 |
@type timeout: float or None |
921 | 1006 |
@param timeout: Maximum time to acquire all locks |
1007 |
@type priority: integer |
|
1008 |
@param priority: Priority for acquiring locks |
|
922 | 1009 |
@type test_notify: callable or None |
923 | 1010 |
@param test_notify: Special callback function for unittesting |
924 | 1011 |
|
... | ... | |
945 | 1032 |
if isinstance(names, basestring): |
946 | 1033 |
names = [names] |
947 | 1034 |
|
948 |
return self.__acquire_inner(names, False, shared, |
|
1035 |
return self.__acquire_inner(names, False, shared, priority,
|
|
949 | 1036 |
running_timeout.Remaining, test_notify) |
950 | 1037 |
|
951 | 1038 |
else: |
... | ... | |
954 | 1041 |
# Some of them may then be deleted later, but we'll cope with this. |
955 | 1042 |
# |
956 | 1043 |
# We'd like to acquire this lock in a shared way, as it's nice if |
957 |
# everybody else can use the instances at the same time. If are |
|
1044 |
# everybody else can use the instances at the same time. If we are
|
|
958 | 1045 |
# acquiring them exclusively though they won't be able to do this |
959 | 1046 |
# anyway, though, so we'll get the list lock exclusively as well in |
960 | 1047 |
# order to be able to do add() on the set while owning it. |
961 |
if not self.__lock.acquire(shared=shared, |
|
1048 |
if not self.__lock.acquire(shared=shared, priority=priority,
|
|
962 | 1049 |
timeout=running_timeout.Remaining()): |
963 | 1050 |
raise _AcquireTimeout() |
964 | 1051 |
try: |
965 | 1052 |
# note we own the set-lock |
966 | 1053 |
self._add_owned() |
967 | 1054 |
|
968 |
return self.__acquire_inner(self.__names(), True, shared, |
|
1055 |
return self.__acquire_inner(self.__names(), True, shared, priority,
|
|
969 | 1056 |
running_timeout.Remaining, test_notify) |
970 | 1057 |
except: |
971 | 1058 |
# We shouldn't have problems adding the lock to the owners list, but |
... | ... | |
978 | 1065 |
except _AcquireTimeout: |
979 | 1066 |
return None |
980 | 1067 |
|
981 |
def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify): |
|
1068 |
def __acquire_inner(self, names, want_all, shared, priority, |
|
1069 |
timeout_fn, test_notify): |
|
982 | 1070 |
"""Inner logic for acquiring a number of locks. |
983 | 1071 |
|
984 | 1072 |
@param names: Names of the locks to be acquired |
985 | 1073 |
@param want_all: Whether all locks in the set should be acquired |
986 | 1074 |
@param shared: Whether to acquire in shared mode |
987 | 1075 |
@param timeout_fn: Function returning remaining timeout |
1076 |
@param priority: Priority for acquiring locks |
|
988 | 1077 |
@param test_notify: Special callback function for unittesting |
989 | 1078 |
|
990 | 1079 |
""" |
... | ... | |
1028 | 1117 |
try: |
1029 | 1118 |
# raises LockError if the lock was deleted |
1030 | 1119 |
acq_success = lock.acquire(shared=shared, timeout=timeout, |
1120 |
priority=priority, |
|
1031 | 1121 |
test_notify=test_notify_fn) |
1032 | 1122 |
except errors.LockError: |
1033 | 1123 |
if want_all: |
... | ... | |
1146 | 1236 |
lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) |
1147 | 1237 |
|
1148 | 1238 |
if acquired: |
1239 |
# No need for priority or timeout here as this lock has just been |
|
1240 |
# created |
|
1149 | 1241 |
lock.acquire(shared=shared) |
1150 | 1242 |
# now the lock cannot be deleted, we have it! |
1151 | 1243 |
try: |
Also available in: Unified diff