Revision 887c7aa6
b/doc/design-2.3.rst | ||
---|---|---|
205 | 205 |
a restart or crash of the master daemon. |
206 | 206 |
|
207 | 207 |
Priorities also need to be considered inside the locking library to |
208 |
ensure opcodes with higher priorities get locks first, but the design
|
|
209 |
changes for this will be discussed in a separate section.
|
|
208 |
ensure opcodes with higher priorities get locks first. See
|
|
209 |
:ref:`locking priorities <locking-priorities>` for more details.
|
|
210 | 210 |
|
211 | 211 |
Worker pool |
212 | 212 |
+++++++++++ |
... | ... | |
243 | 243 |
With these changes, the job queue will be able to implement per-job |
244 | 244 |
priorities. |
245 | 245 |
|
246 |
.. _locking-priorities: |
|
247 |
|
|
248 |
Locking |
|
249 |
+++++++ |
|
250 |
|
|
251 |
In order to support priorities in Ganeti's own lock classes, |
|
252 |
``locking.SharedLock`` and ``locking.LockSet``, the internal structure |
|
253 |
of the former class needs to be changed. The last major change in this |
|
254 |
area was done for Ganeti 2.1 and can be found in the respective |
|
255 |
:doc:`design document <design-2.1>`. |
|
256 |
|
|
257 |
The plain list (``[]``) used as a queue is replaced by a heap queue, |
|
258 |
similar to the `worker pool`_. The heap or priority queue does automatic |
|
259 |
sorting, thereby automatically taking care of priorities. For each |
|
260 |
priority there's a plain list with pending acquires, like the single |
|
261 |
queue of pending acquires before this change. |
|
262 |
|
|
263 |
When the lock is released, the code locates the list of pending acquires |
|
264 |
for the highest priority waiting. The first condition (index 0) is |
|
265 |
notified. Once all waiting threads received the notification, the |
|
266 |
condition is removed from the list. If the list of conditions is empty |
|
267 |
it's removed from the heap queue. |
|
268 |
|
|
269 |
Like before, shared acquires are grouped and skip ahead of exclusive |
|
270 |
acquires if there's already an existing shared acquire for a priority. |
|
271 |
To accomplish this, a separate dictionary of shared acquires per |
|
272 |
priority is maintained. |
|
273 |
|
|
274 |
To simplify the code and reduce memory consumption, the concept of the |
|
275 |
"active" and "inactive" condition for shared acquires is abolished. The |
|
276 |
lock can't predict what priorities the next acquires will use and even |
|
277 |
keeping a cache can become computationally expensive for arguable |
|
278 |
benefit (the underlying POSIX pipe, see ``pipe(2)``, needs to be |
|
279 |
re-created for each notification anyway). |
|
280 |
|
|
281 |
The following diagram shows a possible state of the internal queue from |
|
282 |
a high-level view. Conditions are shown as (waiting) threads. Assuming |
|
283 |
no modifications are made to the queue (e.g. more acquires or timeouts), |
|
284 |
the lock would be acquired by the threads in this order (concurrent |
|
285 |
acquires in parentheses): ``threadE1``, ``threadE2``, (``threadS1``, |
|
286 |
``threadS2``, ``threadS3``), (``threadS4``, ``threadS5``), ``threadE3``, |
|
287 |
``threadS6``, ``threadE4``, ``threadE5``. |
|
288 |
|
|
289 |
:: |
|
290 |
|
|
291 |
[ |
|
292 |
(0, [exc/threadE1, exc/threadE2, shr/threadS1/threadS2/threadS3]), |
|
293 |
(2, [shr/threadS4/threadS5]), |
|
294 |
(10, [exc/threadE3]), |
|
295 |
(33, [shr/threadS6, exc/threadE4, exc/threadE5]), |
|
296 |
] |
|
297 |
|
|
298 |
|
|
246 | 299 |
IPv6 support |
247 | 300 |
------------ |
248 | 301 |
|
b/lib/locking.py | ||
---|---|---|
32 | 32 |
import errno |
33 | 33 |
import weakref |
34 | 34 |
import logging |
35 |
import heapq |
|
35 | 36 |
|
36 | 37 |
from ganeti import errors |
37 | 38 |
from ganeti import utils |
... | ... | |
41 | 42 |
_EXCLUSIVE_TEXT = "exclusive" |
42 | 43 |
_SHARED_TEXT = "shared" |
43 | 44 |
|
45 |
_DEFAULT_PRIORITY = 0 |
|
46 |
|
|
44 | 47 |
|
45 | 48 |
def ssynchronized(mylock, shared=0): |
46 | 49 |
"""Shared Synchronization decorator. |
... | ... | |
406 | 409 |
return bool(self._waiters) |
407 | 410 |
|
408 | 411 |
|
412 |
class _PipeConditionWithMode(PipeCondition): |
|
413 |
__slots__ = [ |
|
414 |
"shared", |
|
415 |
] |
|
416 |
|
|
417 |
def __init__(self, lock, shared): |
|
418 |
"""Initializes this class. |
|
419 |
|
|
420 |
""" |
|
421 |
self.shared = shared |
|
422 |
PipeCondition.__init__(self, lock) |
|
423 |
|
|
424 |
|
|
409 | 425 |
class SharedLock(object): |
410 | 426 |
"""Implements a shared lock. |
411 | 427 |
|
... | ... | |
413 | 429 |
acquire_shared(). In order to acquire the lock in an exclusive way threads |
414 | 430 |
can call acquire_exclusive(). |
415 | 431 |
|
416 |
The lock prevents starvation but does not guarantee that threads will acquire |
|
417 |
the shared lock in the order they queued for it, just that they will |
|
418 |
eventually do so. |
|
432 |
Notes on data structures: C{__pending} contains a priority queue (heapq) of |
|
433 |
all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), |
|
434 |
...]}. Each per-priority queue contains a normal in-order list of conditions |
|
435 |
to be notified when the lock can be acquired. Shared locks are grouped |
|
436 |
together by priority and the condition for them is stored in |
|
437 |
C{__pending_shared} if it already exists. C{__pending_by_prio} keeps |
|
438 |
references for the per-priority queues indexed by priority for faster access. |
|
419 | 439 |
|
420 | 440 |
@type name: string |
421 | 441 |
@ivar name: the name of the lock |
... | ... | |
423 | 443 |
""" |
424 | 444 |
__slots__ = [ |
425 | 445 |
"__weakref__", |
426 |
"__active_shr_c", |
|
427 |
"__inactive_shr_c", |
|
428 | 446 |
"__deleted", |
429 | 447 |
"__exc", |
430 | 448 |
"__lock", |
431 | 449 |
"__pending", |
450 |
"__pending_by_prio", |
|
451 |
"__pending_shared", |
|
432 | 452 |
"__shr", |
433 | 453 |
"name", |
434 | 454 |
] |
435 | 455 |
|
436 |
__condition_class = PipeCondition
|
|
456 |
__condition_class = _PipeConditionWithMode
|
|
437 | 457 |
|
438 | 458 |
def __init__(self, name, monitor=None): |
439 | 459 |
"""Construct a new SharedLock. |
... | ... | |
452 | 472 |
|
453 | 473 |
# Queue containing waiting acquires |
454 | 474 |
self.__pending = [] |
455 |
|
|
456 |
# Active and inactive conditions for shared locks |
|
457 |
self.__active_shr_c = self.__condition_class(self.__lock) |
|
458 |
self.__inactive_shr_c = self.__condition_class(self.__lock) |
|
475 |
self.__pending_by_prio = {} |
|
476 |
self.__pending_shared = {} |
|
459 | 477 |
|
460 | 478 |
# Current lock holders |
461 | 479 |
self.__shr = set() |
... | ... | |
509 | 527 |
elif fname == "pending": |
510 | 528 |
data = [] |
511 | 529 |
|
512 |
for cond in self.__pending: |
|
513 |
if cond in (self.__active_shr_c, self.__inactive_shr_c): |
|
514 |
mode = _SHARED_TEXT |
|
515 |
else: |
|
516 |
mode = _EXCLUSIVE_TEXT |
|
530 |
# Sorting instead of copying and using heaq functions for simplicity |
|
531 |
for (_, prioqueue) in sorted(self.__pending): |
|
532 |
for cond in prioqueue: |
|
533 |
if cond.shared: |
|
534 |
mode = _SHARED_TEXT |
|
535 |
else: |
|
536 |
mode = _EXCLUSIVE_TEXT |
|
517 | 537 |
|
518 |
# This function should be fast as it runs with the lock held. Hence
|
|
519 |
# not using utils.NiceSort.
|
|
520 |
data.append((mode, sorted([i.getName()
|
|
521 |
for i in cond.get_waiting()])))
|
|
538 |
# This function should be fast as it runs with the lock held.
|
|
539 |
# Hence not using utils.NiceSort.
|
|
540 |
data.append((mode, sorted(i.getName()
|
|
541 |
for i in cond.get_waiting())))
|
|
522 | 542 |
|
523 | 543 |
info.append(data) |
524 | 544 |
else: |
... | ... | |
584 | 604 |
""" |
585 | 605 |
self.__lock.acquire() |
586 | 606 |
try: |
587 |
return len(self.__pending) |
|
607 |
return sum(len(prioqueue) for (_, prioqueue) in self.__pending) |
|
608 |
finally: |
|
609 |
self.__lock.release() |
|
610 |
|
|
611 |
def _check_empty(self): |
|
612 |
"""Checks whether there are any pending acquires. |
|
613 |
|
|
614 |
@rtype: bool |
|
615 |
|
|
616 |
""" |
|
617 |
self.__lock.acquire() |
|
618 |
try: |
|
619 |
# Order is important: __find_first_pending_queue modifies __pending |
|
620 |
return not (self.__find_first_pending_queue() or |
|
621 |
self.__pending or |
|
622 |
self.__pending_by_prio or |
|
623 |
self.__pending_shared) |
|
588 | 624 |
finally: |
589 | 625 |
self.__lock.release() |
590 | 626 |
|
... | ... | |
606 | 642 |
else: |
607 | 643 |
return len(self.__shr) == 0 and self.__exc is None |
608 | 644 |
|
645 |
def __find_first_pending_queue(self): |
|
646 |
"""Tries to find the topmost queued entry with pending acquires. |
|
647 |
|
|
648 |
Removes empty entries while going through the list. |
|
649 |
|
|
650 |
""" |
|
651 |
while self.__pending: |
|
652 |
(priority, prioqueue) = self.__pending[0] |
|
653 |
|
|
654 |
if not prioqueue: |
|
655 |
heapq.heappop(self.__pending) |
|
656 |
del self.__pending_by_prio[priority] |
|
657 |
assert priority not in self.__pending_shared |
|
658 |
continue |
|
659 |
|
|
660 |
if prioqueue: |
|
661 |
return prioqueue |
|
662 |
|
|
663 |
return None |
|
664 |
|
|
609 | 665 |
def __is_on_top(self, cond): |
610 | 666 |
"""Checks whether the passed condition is on top of the queue. |
611 | 667 |
|
612 | 668 |
The caller must make sure the queue isn't empty. |
613 | 669 |
|
614 | 670 |
""" |
615 |
return self.__pending[0] == cond
|
|
671 |
return cond == self.__find_first_pending_queue()[0]
|
|
616 | 672 |
|
617 |
def __acquire_unlocked(self, shared, timeout): |
|
673 |
def __acquire_unlocked(self, shared, timeout, priority):
|
|
618 | 674 |
"""Acquire a shared lock. |
619 | 675 |
|
620 | 676 |
@param shared: whether to acquire in shared mode; by default an |
621 | 677 |
exclusive lock will be acquired |
622 | 678 |
@param timeout: maximum waiting time before giving up |
679 |
@type priority: integer |
|
680 |
@param priority: Priority for acquiring lock |
|
623 | 681 |
|
624 | 682 |
""" |
625 | 683 |
self.__check_deleted() |
... | ... | |
628 | 686 |
assert not self.__is_owned(), ("double acquire() on a non-recursive lock" |
629 | 687 |
" %s" % self.name) |
630 | 688 |
|
689 |
# Remove empty entries from queue |
|
690 |
self.__find_first_pending_queue() |
|
691 |
|
|
631 | 692 |
# Check whether someone else holds the lock or there are pending acquires. |
632 | 693 |
if not self.__pending and self.__can_acquire(shared): |
633 | 694 |
# Apparently not, can acquire lock directly. |
634 | 695 |
self.__do_acquire(shared) |
635 | 696 |
return True |
636 | 697 |
|
637 |
if shared: |
|
638 |
wait_condition = self.__active_shr_c |
|
698 |
prioqueue = self.__pending_by_prio.get(priority, None) |
|
639 | 699 |
|
640 |
# Check if we're not yet in the queue |
|
641 |
if wait_condition not in self.__pending: |
|
642 |
self.__pending.append(wait_condition) |
|
700 |
if shared: |
|
701 |
# Try to re-use condition for shared acquire |
|
702 |
wait_condition = self.__pending_shared.get(priority, None) |
|
703 |
assert (wait_condition is None or |
|
704 |
(wait_condition.shared and wait_condition in prioqueue)) |
|
643 | 705 |
else: |
644 |
wait_condition = self.__condition_class(self.__lock) |
|
645 |
# Always add to queue |
|
646 |
self.__pending.append(wait_condition) |
|
706 |
wait_condition = None |
|
707 |
|
|
708 |
if wait_condition is None: |
|
709 |
if prioqueue is None: |
|
710 |
assert priority not in self.__pending_by_prio |
|
711 |
|
|
712 |
prioqueue = [] |
|
713 |
heapq.heappush(self.__pending, (priority, prioqueue)) |
|
714 |
self.__pending_by_prio[priority] = prioqueue |
|
715 |
|
|
716 |
wait_condition = self.__condition_class(self.__lock, shared) |
|
717 |
prioqueue.append(wait_condition) |
|
718 |
|
|
719 |
if shared: |
|
720 |
# Keep reference for further shared acquires on same priority. This is |
|
721 |
# better than trying to find it in the list of pending acquires. |
|
722 |
assert priority not in self.__pending_shared |
|
723 |
self.__pending_shared[priority] = wait_condition |
|
647 | 724 |
|
648 | 725 |
try: |
649 | 726 |
# Wait until we become the topmost acquire in the queue or the timeout |
650 | 727 |
# expires. |
728 |
# TODO: Decrease timeout with spurious notifications |
|
651 | 729 |
while not (self.__is_on_top(wait_condition) and |
652 | 730 |
self.__can_acquire(shared)): |
653 | 731 |
# Wait for notification |
... | ... | |
664 | 742 |
return True |
665 | 743 |
finally: |
666 | 744 |
# Remove condition from queue if there are no more waiters |
667 |
if not wait_condition.has_waiting() and not self.__deleted: |
|
668 |
self.__pending.remove(wait_condition) |
|
745 |
if not wait_condition.has_waiting(): |
|
746 |
prioqueue.remove(wait_condition) |
|
747 |
if wait_condition.shared: |
|
748 |
del self.__pending_shared[priority] |
|
669 | 749 |
|
670 | 750 |
return False |
671 | 751 |
|
672 |
def acquire(self, shared=0, timeout=None, test_notify=None): |
|
752 |
def acquire(self, shared=0, timeout=None, priority=_DEFAULT_PRIORITY, |
|
753 |
test_notify=None): |
|
673 | 754 |
"""Acquire a shared lock. |
674 | 755 |
|
675 | 756 |
@type shared: integer (0/1) used as a boolean |
... | ... | |
677 | 758 |
exclusive lock will be acquired |
678 | 759 |
@type timeout: float |
679 | 760 |
@param timeout: maximum waiting time before giving up |
761 |
@type priority: integer |
|
762 |
@param priority: Priority for acquiring lock |
|
680 | 763 |
@type test_notify: callable or None |
681 | 764 |
@param test_notify: Special callback function for unittesting |
682 | 765 |
|
... | ... | |
687 | 770 |
if __debug__ and callable(test_notify): |
688 | 771 |
test_notify() |
689 | 772 |
|
690 |
return self.__acquire_unlocked(shared, timeout) |
|
773 |
return self.__acquire_unlocked(shared, timeout, priority)
|
|
691 | 774 |
finally: |
692 | 775 |
self.__lock.release() |
693 | 776 |
|
... | ... | |
710 | 793 |
self.__shr.remove(threading.currentThread()) |
711 | 794 |
|
712 | 795 |
# Notify topmost condition in queue |
713 |
if self.__pending: |
|
714 |
first_condition = self.__pending[0] |
|
715 |
first_condition.notifyAll() |
|
716 |
|
|
717 |
if first_condition == self.__active_shr_c: |
|
718 |
self.__active_shr_c = self.__inactive_shr_c |
|
719 |
self.__inactive_shr_c = first_condition |
|
796 |
prioqueue = self.__find_first_pending_queue() |
|
797 |
if prioqueue: |
|
798 |
prioqueue[0].notifyAll() |
|
720 | 799 |
|
721 | 800 |
finally: |
722 | 801 |
self.__lock.release() |
723 | 802 |
|
724 |
def delete(self, timeout=None): |
|
803 |
def delete(self, timeout=None, priority=_DEFAULT_PRIORITY):
|
|
725 | 804 |
"""Delete a Shared Lock. |
726 | 805 |
|
727 | 806 |
This operation will declare the lock for removal. First the lock will be |
... | ... | |
730 | 809 |
|
731 | 810 |
@type timeout: float |
732 | 811 |
@param timeout: maximum waiting time before giving up |
812 |
@type priority: integer |
|
813 |
@param priority: Priority for acquiring lock |
|
733 | 814 |
|
734 | 815 |
""" |
735 | 816 |
self.__lock.acquire() |
... | ... | |
742 | 823 |
acquired = self.__is_exclusive() |
743 | 824 |
|
744 | 825 |
if not acquired: |
745 |
acquired = self.__acquire_unlocked(0, timeout) |
|
826 |
acquired = self.__acquire_unlocked(0, timeout, priority)
|
|
746 | 827 |
|
747 | 828 |
assert self.__is_exclusive() and not self.__is_sharer(), \ |
748 | 829 |
"Lock wasn't acquired in exclusive mode" |
... | ... | |
754 | 835 |
assert not (self.__exc or self.__shr), "Found owner during deletion" |
755 | 836 |
|
756 | 837 |
# Notify all acquires. They'll throw an error. |
757 |
while self.__pending: |
|
758 |
self.__pending.pop().notifyAll() |
|
838 |
for (_, prioqueue) in self.__pending: |
|
839 |
for cond in prioqueue: |
|
840 |
cond.notifyAll() |
|
841 |
|
|
842 |
assert self.__deleted |
|
759 | 843 |
|
760 | 844 |
return acquired |
761 | 845 |
finally: |
... | ... | |
908 | 992 |
self.__lock.release() |
909 | 993 |
return set(result) |
910 | 994 |
|
911 |
def acquire(self, names, timeout=None, shared=0, test_notify=None): |
|
995 |
def acquire(self, names, timeout=None, shared=0, priority=_DEFAULT_PRIORITY, |
|
996 |
test_notify=None): |
|
912 | 997 |
"""Acquire a set of resource locks. |
913 | 998 |
|
914 | 999 |
@type names: list of strings (or string) |
... | ... | |
919 | 1004 |
exclusive lock will be acquired |
920 | 1005 |
@type timeout: float or None |
921 | 1006 |
@param timeout: Maximum time to acquire all locks |
1007 |
@type priority: integer |
|
1008 |
@param priority: Priority for acquiring locks |
|
922 | 1009 |
@type test_notify: callable or None |
923 | 1010 |
@param test_notify: Special callback function for unittesting |
924 | 1011 |
|
... | ... | |
945 | 1032 |
if isinstance(names, basestring): |
946 | 1033 |
names = [names] |
947 | 1034 |
|
948 |
return self.__acquire_inner(names, False, shared, |
|
1035 |
return self.__acquire_inner(names, False, shared, priority,
|
|
949 | 1036 |
running_timeout.Remaining, test_notify) |
950 | 1037 |
|
951 | 1038 |
else: |
... | ... | |
954 | 1041 |
# Some of them may then be deleted later, but we'll cope with this. |
955 | 1042 |
# |
956 | 1043 |
# We'd like to acquire this lock in a shared way, as it's nice if |
957 |
# everybody else can use the instances at the same time. If are |
|
1044 |
# everybody else can use the instances at the same time. If we are
|
|
958 | 1045 |
# acquiring them exclusively though they won't be able to do this |
959 | 1046 |
# anyway, though, so we'll get the list lock exclusively as well in |
960 | 1047 |
# order to be able to do add() on the set while owning it. |
961 |
if not self.__lock.acquire(shared=shared, |
|
1048 |
if not self.__lock.acquire(shared=shared, priority=priority,
|
|
962 | 1049 |
timeout=running_timeout.Remaining()): |
963 | 1050 |
raise _AcquireTimeout() |
964 | 1051 |
try: |
965 | 1052 |
# note we own the set-lock |
966 | 1053 |
self._add_owned() |
967 | 1054 |
|
968 |
return self.__acquire_inner(self.__names(), True, shared, |
|
1055 |
return self.__acquire_inner(self.__names(), True, shared, priority,
|
|
969 | 1056 |
running_timeout.Remaining, test_notify) |
970 | 1057 |
except: |
971 | 1058 |
# We shouldn't have problems adding the lock to the owners list, but |
... | ... | |
978 | 1065 |
except _AcquireTimeout: |
979 | 1066 |
return None |
980 | 1067 |
|
981 |
def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify): |
|
1068 |
def __acquire_inner(self, names, want_all, shared, priority, |
|
1069 |
timeout_fn, test_notify): |
|
982 | 1070 |
"""Inner logic for acquiring a number of locks. |
983 | 1071 |
|
984 | 1072 |
@param names: Names of the locks to be acquired |
985 | 1073 |
@param want_all: Whether all locks in the set should be acquired |
986 | 1074 |
@param shared: Whether to acquire in shared mode |
987 | 1075 |
@param timeout_fn: Function returning remaining timeout |
1076 |
@param priority: Priority for acquiring locks |
|
988 | 1077 |
@param test_notify: Special callback function for unittesting |
989 | 1078 |
|
990 | 1079 |
""" |
... | ... | |
1028 | 1117 |
try: |
1029 | 1118 |
# raises LockError if the lock was deleted |
1030 | 1119 |
acq_success = lock.acquire(shared=shared, timeout=timeout, |
1120 |
priority=priority, |
|
1031 | 1121 |
test_notify=test_notify_fn) |
1032 | 1122 |
except errors.LockError: |
1033 | 1123 |
if want_all: |
... | ... | |
1146 | 1236 |
lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) |
1147 | 1237 |
|
1148 | 1238 |
if acquired: |
1239 |
# No need for priority or timeout here as this lock has just been |
|
1240 |
# created |
|
1149 | 1241 |
lock.acquire(shared=shared) |
1150 | 1242 |
# now the lock cannot be deleted, we have it! |
1151 | 1243 |
try: |
b/test/ganeti.locking_unittest.py | ||
---|---|---|
28 | 28 |
import Queue |
29 | 29 |
import threading |
30 | 30 |
import random |
31 |
import itertools |
|
31 | 32 |
|
32 | 33 |
from ganeti import locking |
33 | 34 |
from ganeti import errors |
34 | 35 |
from ganeti import utils |
36 |
from ganeti import compat |
|
35 | 37 |
|
36 | 38 |
import testutils |
37 | 39 |
|
... | ... | |
701 | 703 |
|
702 | 704 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
703 | 705 |
|
706 |
def testPriority(self): |
|
707 |
# Acquire in exclusive mode |
|
708 |
self.assert_(self.sl.acquire(shared=0)) |
|
709 |
|
|
710 |
# Queue acquires |
|
711 |
def _Acquire(prev, next, shared, priority, result): |
|
712 |
prev.wait() |
|
713 |
self.sl.acquire(shared=shared, priority=priority, test_notify=next.set) |
|
714 |
try: |
|
715 |
self.done.put(result) |
|
716 |
finally: |
|
717 |
self.sl.release() |
|
718 |
|
|
719 |
counter = itertools.count(0) |
|
720 |
priorities = range(-20, 30) |
|
721 |
first = threading.Event() |
|
722 |
prev = first |
|
723 |
|
|
724 |
# Data structure: |
|
725 |
# { |
|
726 |
# priority: |
|
727 |
# [(shared/exclusive, set(acquire names), set(pending threads)), |
|
728 |
# (shared/exclusive, ...), |
|
729 |
# ..., |
|
730 |
# ], |
|
731 |
# } |
|
732 |
perprio = {} |
|
733 |
|
|
734 |
# References shared acquire per priority in L{perprio}. Data structure: |
|
735 |
# { |
|
736 |
# priority: (shared=1, set(acquire names), set(pending threads)), |
|
737 |
# } |
|
738 |
prioshared = {} |
|
739 |
|
|
740 |
for seed in [4979, 9523, 14902, 32440]: |
|
741 |
# Use a deterministic random generator |
|
742 |
rnd = random.Random(seed) |
|
743 |
for priority in [rnd.choice(priorities) for _ in range(30)]: |
|
744 |
modes = [0, 1] |
|
745 |
rnd.shuffle(modes) |
|
746 |
for shared in modes: |
|
747 |
# Unique name |
|
748 |
acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority) |
|
749 |
|
|
750 |
ev = threading.Event() |
|
751 |
thread = self._addThread(target=_Acquire, |
|
752 |
args=(prev, ev, shared, priority, acqname)) |
|
753 |
prev = ev |
|
754 |
|
|
755 |
# Record expected aqcuire, see above for structure |
|
756 |
data = (shared, set([acqname]), set([thread])) |
|
757 |
priolist = perprio.setdefault(priority, []) |
|
758 |
if shared: |
|
759 |
priosh = prioshared.get(priority, None) |
|
760 |
if priosh: |
|
761 |
# Shared acquires are merged |
|
762 |
for i, j in zip(priosh[1:], data[1:]): |
|
763 |
i.update(j) |
|
764 |
assert data[0] == priosh[0] |
|
765 |
else: |
|
766 |
prioshared[priority] = data |
|
767 |
priolist.append(data) |
|
768 |
else: |
|
769 |
priolist.append(data) |
|
770 |
|
|
771 |
# Start all acquires and wait for them |
|
772 |
first.set() |
|
773 |
prev.wait() |
|
774 |
|
|
775 |
# Check lock information |
|
776 |
self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name]) |
|
777 |
self.assertEqual(self.sl.GetInfo(["mode", "owner"]), |
|
778 |
["exclusive", [threading.currentThread().getName()]]) |
|
779 |
self.assertEqual(self.sl.GetInfo(["name", "pending"]), |
|
780 |
[self.sl.name, |
|
781 |
[(["exclusive", "shared"][int(bool(shared))], |
|
782 |
sorted([t.getName() for t in threads])) |
|
783 |
for acquires in [perprio[i] |
|
784 |
for i in sorted(perprio.keys())] |
|
785 |
for (shared, _, threads) in acquires]]) |
|
786 |
|
|
787 |
# Let threads acquire the lock |
|
788 |
self.sl.release() |
|
789 |
|
|
790 |
# Wait for everything to finish |
|
791 |
self._waitThreads() |
|
792 |
|
|
793 |
self.assert_(self.sl._check_empty()) |
|
794 |
|
|
795 |
# Check acquires by priority |
|
796 |
for acquires in [perprio[i] for i in sorted(perprio.keys())]: |
|
797 |
for (_, names, _) in acquires: |
|
798 |
# For shared acquires, the set will contain 1..n entries. For exclusive |
|
799 |
# acquires only one. |
|
800 |
while names: |
|
801 |
names.remove(self.done.get_nowait()) |
|
802 |
self.assertFalse(compat.any(names for (_, names, _) in acquires)) |
|
803 |
|
|
804 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
805 |
|
|
704 | 806 |
|
705 | 807 |
class TestSharedLockInCondition(_ThreadedTestCase): |
706 | 808 |
"""SharedLock as a condition lock tests""" |
... | ... | |
1259 | 1361 |
self.assertEqual(self.done.get_nowait(), 'DONE') |
1260 | 1362 |
self._setUpLS() |
1261 | 1363 |
|
1364 |
def testPriority(self): |
|
1365 |
def _Acquire(prev, next, name, priority, success_fn): |
|
1366 |
prev.wait() |
|
1367 |
self.assert_(self.ls.acquire(name, shared=0, |
|
1368 |
priority=priority, |
|
1369 |
test_notify=lambda _: next.set())) |
|
1370 |
try: |
|
1371 |
success_fn() |
|
1372 |
finally: |
|
1373 |
self.ls.release() |
|
1374 |
|
|
1375 |
# Get all in exclusive mode |
|
1376 |
self.assert_(self.ls.acquire(locking.ALL_SET, shared=0)) |
|
1377 |
|
|
1378 |
done_two = Queue.Queue(0) |
|
1379 |
|
|
1380 |
first = threading.Event() |
|
1381 |
prev = first |
|
1382 |
|
|
1383 |
acquires = [("one", prio, self.done) for prio in range(1, 33)] |
|
1384 |
acquires.extend([("two", prio, done_two) for prio in range(1, 33)]) |
|
1385 |
|
|
1386 |
# Use a deterministic random generator |
|
1387 |
random.Random(741).shuffle(acquires) |
|
1388 |
|
|
1389 |
for (name, prio, done) in acquires: |
|
1390 |
ev = threading.Event() |
|
1391 |
self._addThread(target=_Acquire, |
|
1392 |
args=(prev, ev, name, prio, |
|
1393 |
compat.partial(done.put, "Prio%s" % prio))) |
|
1394 |
prev = ev |
|
1395 |
|
|
1396 |
# Start acquires |
|
1397 |
first.set() |
|
1398 |
|
|
1399 |
# Wait for last acquire to start |
|
1400 |
prev.wait() |
|
1401 |
|
|
1402 |
# Let threads acquire locks |
|
1403 |
self.ls.release() |
|
1404 |
|
|
1405 |
# Wait for threads to finish |
|
1406 |
self._waitThreads() |
|
1407 |
|
|
1408 |
for i in range(1, 33): |
|
1409 |
self.assertEqual(self.done.get_nowait(), "Prio%s" % i) |
|
1410 |
self.assertEqual(done_two.get_nowait(), "Prio%s" % i) |
|
1411 |
|
|
1412 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
1413 |
self.assertRaises(Queue.Empty, done_two.get_nowait) |
|
1414 |
|
|
1262 | 1415 |
|
1263 | 1416 |
class TestGanetiLockManager(_ThreadedTestCase): |
1264 | 1417 |
|
Also available in: Unified diff