Revision 3dbe3ddf
b/lib/locking.py | ||
---|---|---|
727 | 727 |
finally: |
728 | 728 |
self.__lock.release() |
729 | 729 |
|
730 |
def downgrade(self): |
|
731 |
"""Changes the lock mode from exclusive to shared. |
|
732 |
|
|
733 |
Pending acquires in shared mode on the same priority will go ahead. |
|
734 |
|
|
735 |
""" |
|
736 |
self.__lock.acquire() |
|
737 |
try: |
|
738 |
assert self.__is_owned(), "Lock must be owned" |
|
739 |
|
|
740 |
if self.__is_exclusive(): |
|
741 |
# Do nothing if the lock is already acquired in shared mode |
|
742 |
self.__exc = None |
|
743 |
self.__do_acquire(1) |
|
744 |
|
|
745 |
# Important: pending shared acquires should only jump ahead if there |
|
746 |
# was a transition from exclusive to shared, otherwise an owner of a |
|
747 |
# shared lock can keep calling this function to push incoming shared |
|
748 |
# acquires |
|
749 |
(priority, prioqueue) = self.__find_first_pending_queue() |
|
750 |
if prioqueue: |
|
751 |
# Is there a pending shared acquire on this priority? |
|
752 |
cond = self.__pending_shared.pop(priority, None) |
|
753 |
if cond: |
|
754 |
assert cond.shared |
|
755 |
assert cond in prioqueue |
|
756 |
|
|
757 |
# Ensure shared acquire is on top of queue |
|
758 |
if len(prioqueue) > 1: |
|
759 |
prioqueue.remove(cond) |
|
760 |
prioqueue.insert(0, cond) |
|
761 |
|
|
762 |
# Notify |
|
763 |
cond.notifyAll() |
|
764 |
|
|
765 |
assert not self.__is_exclusive() |
|
766 |
assert self.__is_sharer() |
|
767 |
|
|
768 |
return True |
|
769 |
finally: |
|
770 |
self.__lock.release() |
|
771 |
|
|
730 | 772 |
def release(self): |
731 | 773 |
"""Release a Shared Lock. |
732 | 774 |
|
... | ... | |
881 | 923 |
""" |
882 | 924 |
return "%s/%s" % (self.name, mname) |
883 | 925 |
|
926 |
def _get_lock(self): |
|
927 |
"""Returns the lockset-internal lock. |
|
928 |
|
|
929 |
""" |
|
930 |
return self.__lock |
|
931 |
|
|
932 |
def _get_lockdict(self): |
|
933 |
"""Returns the lockset-internal lock dictionary. |
|
934 |
|
|
935 |
Accessing this structure is only safe in single-thread usage or when the |
|
936 |
lockset-internal lock is held. |
|
937 |
|
|
938 |
""" |
|
939 |
return self.__lockdict |
|
940 |
|
|
884 | 941 |
def _is_owned(self): |
885 | 942 |
"""Is the current thread a current level owner?""" |
886 | 943 |
return threading.currentThread() in self.__owners |
... | ... | |
1122 | 1179 |
|
1123 | 1180 |
return acquired |
1124 | 1181 |
|
1182 |
def downgrade(self, names=None): |
|
1183 |
"""Downgrade a set of resource locks from exclusive to shared mode. |
|
1184 |
|
|
1185 |
The locks must have been acquired in exclusive mode. |
|
1186 |
|
|
1187 |
""" |
|
1188 |
assert self._is_owned(), ("downgrade on lockset %s while not owning any" |
|
1189 |
" lock" % self.name) |
|
1190 |
|
|
1191 |
# Support passing in a single resource to downgrade rather than many |
|
1192 |
if isinstance(names, basestring): |
|
1193 |
names = [names] |
|
1194 |
|
|
1195 |
owned = self._list_owned() |
|
1196 |
|
|
1197 |
if names is None: |
|
1198 |
names = owned |
|
1199 |
else: |
|
1200 |
names = set(names) |
|
1201 |
assert owned.issuperset(names), \ |
|
1202 |
("downgrade() on unheld resources %s (set %s)" % |
|
1203 |
(names.difference(owned), self.name)) |
|
1204 |
|
|
1205 |
for lockname in names: |
|
1206 |
self.__lockdict[lockname].downgrade() |
|
1207 |
|
|
1208 |
# Do we own the lockset in exclusive mode? |
|
1209 |
if self.__lock._is_owned(shared=0): |
|
1210 |
# Have all locks been downgraded? |
|
1211 |
if not compat.any(lock._is_owned(shared=0) |
|
1212 |
for lock in self.__lockdict.values()): |
|
1213 |
self.__lock.downgrade() |
|
1214 |
assert self.__lock._is_owned(shared=1) |
|
1215 |
|
|
1216 |
return True |
|
1217 |
|
|
1125 | 1218 |
def release(self, names=None): |
1126 | 1219 |
"""Release a set of resource locks, at the same level. |
1127 | 1220 |
|
... | ... | |
1458 | 1551 |
return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, |
1459 | 1552 |
priority=priority) |
1460 | 1553 |
|
1554 |
def downgrade(self, level, names=None): |
|
1555 |
"""Downgrade a set of resource locks from exclusive to shared mode. |
|
1556 |
|
|
1557 |
You must have acquired the locks in exclusive mode. |
|
1558 |
|
|
1559 |
@type level: member of locking.LEVELS |
|
1560 |
@param level: the level at which the locks shall be downgraded |
|
1561 |
@type names: list of strings, or None |
|
1562 |
@param names: the names of the locks which shall be downgraded |
|
1563 |
(defaults to all the locks acquired at the level) |
|
1564 |
|
|
1565 |
""" |
|
1566 |
assert level in LEVELS, "Invalid locking level %s" % level |
|
1567 |
|
|
1568 |
return self.__keyring[level].downgrade(names=names) |
|
1569 |
|
|
1461 | 1570 |
def release(self, level, names=None): |
1462 | 1571 |
"""Release a set of resource locks, at the same level. |
1463 | 1572 |
|
b/test/ganeti.locking_unittest.py | ||
---|---|---|
612 | 612 |
|
613 | 613 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
614 | 614 |
|
615 |
def testIllegalDowngrade(self): |
|
616 |
# Not yet acquired |
|
617 |
self.assertRaises(AssertionError, self.sl.downgrade) |
|
618 |
|
|
619 |
# Acquire in shared mode, downgrade should be no-op |
|
620 |
self.assertTrue(self.sl.acquire(shared=1)) |
|
621 |
self.assertTrue(self.sl._is_owned(shared=1)) |
|
622 |
self.assertTrue(self.sl.downgrade()) |
|
623 |
self.assertTrue(self.sl._is_owned(shared=1)) |
|
624 |
self.sl.release() |
|
625 |
|
|
626 |
def testDowngrade(self): |
|
627 |
self.assertTrue(self.sl.acquire()) |
|
628 |
self.assertTrue(self.sl._is_owned(shared=0)) |
|
629 |
self.assertTrue(self.sl.downgrade()) |
|
630 |
self.assertTrue(self.sl._is_owned(shared=1)) |
|
631 |
self.sl.release() |
|
632 |
|
|
633 |
@_Repeat |
|
634 |
def testDowngradeJumpsAheadOfExclusive(self): |
|
635 |
def _KeepExclusive(ev_got, ev_downgrade, ev_release): |
|
636 |
self.assertTrue(self.sl.acquire()) |
|
637 |
self.assertTrue(self.sl._is_owned(shared=0)) |
|
638 |
ev_got.set() |
|
639 |
ev_downgrade.wait() |
|
640 |
self.assertTrue(self.sl._is_owned(shared=0)) |
|
641 |
self.assertTrue(self.sl.downgrade()) |
|
642 |
self.assertTrue(self.sl._is_owned(shared=1)) |
|
643 |
ev_release.wait() |
|
644 |
self.assertTrue(self.sl._is_owned(shared=1)) |
|
645 |
self.sl.release() |
|
646 |
|
|
647 |
def _KeepExclusive2(ev_started, ev_release): |
|
648 |
self.assertTrue(self.sl.acquire(test_notify=ev_started.set)) |
|
649 |
self.assertTrue(self.sl._is_owned(shared=0)) |
|
650 |
ev_release.wait() |
|
651 |
self.assertTrue(self.sl._is_owned(shared=0)) |
|
652 |
self.sl.release() |
|
653 |
|
|
654 |
def _KeepShared(ev_started, ev_got, ev_release): |
|
655 |
self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set)) |
|
656 |
self.assertTrue(self.sl._is_owned(shared=1)) |
|
657 |
ev_got.set() |
|
658 |
ev_release.wait() |
|
659 |
self.assertTrue(self.sl._is_owned(shared=1)) |
|
660 |
self.sl.release() |
|
661 |
|
|
662 |
# Acquire lock in exclusive mode |
|
663 |
ev_got_excl1 = threading.Event() |
|
664 |
ev_downgrade_excl1 = threading.Event() |
|
665 |
ev_release_excl1 = threading.Event() |
|
666 |
th_excl1 = self._addThread(target=_KeepExclusive, |
|
667 |
args=(ev_got_excl1, ev_downgrade_excl1, |
|
668 |
ev_release_excl1)) |
|
669 |
ev_got_excl1.wait() |
|
670 |
|
|
671 |
# Start a second exclusive acquire |
|
672 |
ev_started_excl2 = threading.Event() |
|
673 |
ev_release_excl2 = threading.Event() |
|
674 |
th_excl2 = self._addThread(target=_KeepExclusive2, |
|
675 |
args=(ev_started_excl2, ev_release_excl2)) |
|
676 |
ev_started_excl2.wait() |
|
677 |
|
|
678 |
# Start shared acquires, will jump ahead of second exclusive acquire when |
|
679 |
# first exclusive acquire downgrades |
|
680 |
ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)] |
|
681 |
ev_release_shared = threading.Event() |
|
682 |
|
|
683 |
th_shared = [self._addThread(target=_KeepShared, |
|
684 |
args=(ev_started, ev_got, ev_release_shared)) |
|
685 |
for (ev_started, ev_got) in ev_shared] |
|
686 |
|
|
687 |
# Wait for all shared acquires to start |
|
688 |
for (ev, _) in ev_shared: |
|
689 |
ev.wait() |
|
690 |
|
|
691 |
# Check lock information |
|
692 |
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])), |
|
693 |
(self.sl.name, "exclusive", [th_excl1.getName()], None)) |
|
694 |
(_, _, _, pending) = self.sl.GetInfo(set([query.LQ_PENDING])) |
|
695 |
self.assertEqual([(pendmode, sorted(waiting)) |
|
696 |
for (pendmode, waiting) in pending], |
|
697 |
[("exclusive", [th_excl2.getName()]), |
|
698 |
("shared", sorted(th.getName() for th in th_shared))]) |
|
699 |
|
|
700 |
# Shared acquires won't start until the exclusive lock is downgraded |
|
701 |
ev_downgrade_excl1.set() |
|
702 |
|
|
703 |
# Wait for all shared acquires to be successful |
|
704 |
for (_, ev) in ev_shared: |
|
705 |
ev.wait() |
|
706 |
|
|
707 |
# Check lock information again |
|
708 |
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_PENDING])), |
|
709 |
(self.sl.name, "shared", None, |
|
710 |
[("exclusive", [th_excl2.getName()])])) |
|
711 |
(_, _, owner, _) = self.sl.GetInfo(set([query.LQ_OWNER])) |
|
712 |
self.assertEqual(set(owner), set([th_excl1.getName()] + |
|
713 |
[th.getName() for th in th_shared])) |
|
714 |
|
|
715 |
ev_release_excl1.set() |
|
716 |
ev_release_excl2.set() |
|
717 |
ev_release_shared.set() |
|
718 |
|
|
719 |
self._waitThreads() |
|
720 |
|
|
721 |
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER, |
|
722 |
query.LQ_PENDING])), |
|
723 |
(self.sl.name, None, None, [])) |
|
724 |
|
|
615 | 725 |
@_Repeat |
616 | 726 |
def testMixedAcquireTimeout(self): |
617 | 727 |
sync = threading.Event() |
... | ... | |
1374 | 1484 |
self.assertEqual(self.done.get_nowait(), 'DONE') |
1375 | 1485 |
self._setUpLS() |
1376 | 1486 |
|
1487 |
def testAcquireWithNamesDowngrade(self): |
|
1488 |
self.assertEquals(self.ls.acquire("two", shared=0), set(["two"])) |
|
1489 |
self.assertTrue(self.ls._is_owned()) |
|
1490 |
self.assertFalse(self.ls._get_lock()._is_owned()) |
|
1491 |
self.ls.release() |
|
1492 |
self.assertFalse(self.ls._is_owned()) |
|
1493 |
self.assertFalse(self.ls._get_lock()._is_owned()) |
|
1494 |
# Can't downgrade after releasing |
|
1495 |
self.assertRaises(AssertionError, self.ls.downgrade, "two") |
|
1496 |
|
|
1497 |
def testDowngrade(self): |
|
1498 |
# Not owning anything, must raise an exception |
|
1499 |
self.assertFalse(self.ls._is_owned()) |
|
1500 |
self.assertRaises(AssertionError, self.ls.downgrade) |
|
1501 |
|
|
1502 |
self.assertFalse(compat.any(i._is_owned() |
|
1503 |
for i in self.ls._get_lockdict().values())) |
|
1504 |
|
|
1505 |
self.assertEquals(self.ls.acquire(None, shared=0), |
|
1506 |
set(["one", "two", "three"])) |
|
1507 |
self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock") |
|
1508 |
|
|
1509 |
self.assertTrue(self.ls._get_lock()._is_owned(shared=0)) |
|
1510 |
self.assertTrue(compat.all(i._is_owned(shared=0) |
|
1511 |
for i in self.ls._get_lockdict().values())) |
|
1512 |
|
|
1513 |
# Start downgrading locks |
|
1514 |
self.assertTrue(self.ls.downgrade(names=["one"])) |
|
1515 |
self.assertTrue(self.ls._get_lock()._is_owned(shared=0)) |
|
1516 |
self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")]) |
|
1517 |
for name, lock in |
|
1518 |
self.ls._get_lockdict().items())) |
|
1519 |
|
|
1520 |
self.assertTrue(self.ls.downgrade(names="two")) |
|
1521 |
self.assertTrue(self.ls._get_lock()._is_owned(shared=0)) |
|
1522 |
should_share = lambda name: [0, 1][int(name in ("one", "two"))] |
|
1523 |
self.assertTrue(compat.all(lock._is_owned(shared=should_share(name)) |
|
1524 |
for name, lock in |
|
1525 |
self.ls._get_lockdict().items())) |
|
1526 |
|
|
1527 |
# Downgrading the last exclusive lock to shared must downgrade the |
|
1528 |
# lockset-internal lock too |
|
1529 |
self.assertTrue(self.ls.downgrade(names="three")) |
|
1530 |
self.assertTrue(self.ls._get_lock()._is_owned(shared=1)) |
|
1531 |
self.assertTrue(compat.all(i._is_owned(shared=1) |
|
1532 |
for i in self.ls._get_lockdict().values())) |
|
1533 |
|
|
1534 |
# Downgrading a shared lock must be a no-op |
|
1535 |
self.assertTrue(self.ls.downgrade(names=["one", "three"])) |
|
1536 |
self.assertTrue(self.ls._get_lock()._is_owned(shared=1)) |
|
1537 |
self.assertTrue(compat.all(i._is_owned(shared=1) |
|
1538 |
for i in self.ls._get_lockdict().values())) |
|
1539 |
|
|
1540 |
self.ls.release() |
|
1541 |
|
|
1377 | 1542 |
def testPriority(self): |
1378 | 1543 |
def _Acquire(prev, next, name, priority, success_fn): |
1379 | 1544 |
prev.wait() |
Also available in: Unified diff