Revision 3dbe3ddf

b/lib/locking.py
727 727
    finally:
728 728
      self.__lock.release()
729 729

  
730
  def downgrade(self):
731
    """Changes the lock mode from exclusive to shared.
732

  
733
    Pending acquires in shared mode on the same priority will go ahead.
734

  
735
    """
736
    self.__lock.acquire()
737
    try:
738
      assert self.__is_owned(), "Lock must be owned"
739

  
740
      if self.__is_exclusive():
741
        # Do nothing if the lock is already acquired in shared mode
742
        self.__exc = None
743
        self.__do_acquire(1)
744

  
745
        # Important: pending shared acquires should only jump ahead if there
746
        # was a transition from exclusive to shared, otherwise an owner of a
747
        # shared lock can keep calling this function to push incoming shared
748
        # acquires
749
        (priority, prioqueue) = self.__find_first_pending_queue()
750
        if prioqueue:
751
          # Is there a pending shared acquire on this priority?
752
          cond = self.__pending_shared.pop(priority, None)
753
          if cond:
754
            assert cond.shared
755
            assert cond in prioqueue
756

  
757
            # Ensure shared acquire is on top of queue
758
            if len(prioqueue) > 1:
759
              prioqueue.remove(cond)
760
              prioqueue.insert(0, cond)
761

  
762
            # Notify
763
            cond.notifyAll()
764

  
765
      assert not self.__is_exclusive()
766
      assert self.__is_sharer()
767

  
768
      return True
769
    finally:
770
      self.__lock.release()
771

  
730 772
  def release(self):
731 773
    """Release a Shared Lock.
732 774

  
......
881 923
    """
882 924
    return "%s/%s" % (self.name, mname)
883 925

  
926
  def _get_lock(self):
927
    """Returns the lockset-internal lock.
928

  
929
    """
930
    return self.__lock
931

  
932
  def _get_lockdict(self):
933
    """Returns the lockset-internal lock dictionary.
934

  
935
    Accessing this structure is only safe in single-thread usage or when the
936
    lockset-internal lock is held.
937

  
938
    """
939
    return self.__lockdict
940

  
884 941
  def _is_owned(self):
885 942
    """Is the current thread a current level owner?"""
886 943
    return threading.currentThread() in self.__owners
......
1122 1179

  
1123 1180
    return acquired
1124 1181

  
1182
  def downgrade(self, names=None):
1183
    """Downgrade a set of resource locks from exclusive to shared mode.
1184

  
1185
    The locks must have been acquired in exclusive mode.
1186

  
1187
    """
1188
    assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1189
                              " lock" % self.name)
1190

  
1191
    # Support passing in a single resource to downgrade rather than many
1192
    if isinstance(names, basestring):
1193
      names = [names]
1194

  
1195
    owned = self._list_owned()
1196

  
1197
    if names is None:
1198
      names = owned
1199
    else:
1200
      names = set(names)
1201
      assert owned.issuperset(names), \
1202
        ("downgrade() on unheld resources %s (set %s)" %
1203
         (names.difference(owned), self.name))
1204

  
1205
    for lockname in names:
1206
      self.__lockdict[lockname].downgrade()
1207

  
1208
    # Do we own the lockset in exclusive mode?
1209
    if self.__lock._is_owned(shared=0):
1210
      # Have all locks been downgraded?
1211
      if not compat.any(lock._is_owned(shared=0)
1212
                        for lock in self.__lockdict.values()):
1213
        self.__lock.downgrade()
1214
        assert self.__lock._is_owned(shared=1)
1215

  
1216
    return True
1217

  
1125 1218
  def release(self, names=None):
1126 1219
    """Release a set of resource locks, at the same level.
1127 1220

  
......
1458 1551
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1459 1552
                                         priority=priority)
1460 1553

  
1554
  def downgrade(self, level, names=None):
1555
    """Downgrade a set of resource locks from exclusive to shared mode.
1556

  
1557
    You must have acquired the locks in exclusive mode.
1558

  
1559
    @type level: member of locking.LEVELS
1560
    @param level: the level at which the locks shall be downgraded
1561
    @type names: list of strings, or None
1562
    @param names: the names of the locks which shall be downgraded
1563
        (defaults to all the locks acquired at the level)
1564

  
1565
    """
1566
    assert level in LEVELS, "Invalid locking level %s" % level
1567

  
1568
    return self.__keyring[level].downgrade(names=names)
1569

  
1461 1570
  def release(self, level, names=None):
1462 1571
    """Release a set of resource locks, at the same level.
1463 1572

  
b/test/ganeti.locking_unittest.py
612 612

  
613 613
    self.assertRaises(Queue.Empty, self.done.get_nowait)
614 614

  
615
  def testIllegalDowngrade(self):
616
    # Not yet acquired
617
    self.assertRaises(AssertionError, self.sl.downgrade)
618

  
619
    # Acquire in shared mode, downgrade should be no-op
620
    self.assertTrue(self.sl.acquire(shared=1))
621
    self.assertTrue(self.sl._is_owned(shared=1))
622
    self.assertTrue(self.sl.downgrade())
623
    self.assertTrue(self.sl._is_owned(shared=1))
624
    self.sl.release()
625

  
626
  def testDowngrade(self):
627
    self.assertTrue(self.sl.acquire())
628
    self.assertTrue(self.sl._is_owned(shared=0))
629
    self.assertTrue(self.sl.downgrade())
630
    self.assertTrue(self.sl._is_owned(shared=1))
631
    self.sl.release()
632

  
633
  @_Repeat
634
  def testDowngradeJumpsAheadOfExclusive(self):
635
    def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636
      self.assertTrue(self.sl.acquire())
637
      self.assertTrue(self.sl._is_owned(shared=0))
638
      ev_got.set()
639
      ev_downgrade.wait()
640
      self.assertTrue(self.sl._is_owned(shared=0))
641
      self.assertTrue(self.sl.downgrade())
642
      self.assertTrue(self.sl._is_owned(shared=1))
643
      ev_release.wait()
644
      self.assertTrue(self.sl._is_owned(shared=1))
645
      self.sl.release()
646

  
647
    def _KeepExclusive2(ev_started, ev_release):
648
      self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649
      self.assertTrue(self.sl._is_owned(shared=0))
650
      ev_release.wait()
651
      self.assertTrue(self.sl._is_owned(shared=0))
652
      self.sl.release()
653

  
654
    def _KeepShared(ev_started, ev_got, ev_release):
655
      self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656
      self.assertTrue(self.sl._is_owned(shared=1))
657
      ev_got.set()
658
      ev_release.wait()
659
      self.assertTrue(self.sl._is_owned(shared=1))
660
      self.sl.release()
661

  
662
    # Acquire lock in exclusive mode
663
    ev_got_excl1 = threading.Event()
664
    ev_downgrade_excl1 = threading.Event()
665
    ev_release_excl1 = threading.Event()
666
    th_excl1 = self._addThread(target=_KeepExclusive,
667
                               args=(ev_got_excl1, ev_downgrade_excl1,
668
                                     ev_release_excl1))
669
    ev_got_excl1.wait()
670

  
671
    # Start a second exclusive acquire
672
    ev_started_excl2 = threading.Event()
673
    ev_release_excl2 = threading.Event()
674
    th_excl2 = self._addThread(target=_KeepExclusive2,
675
                               args=(ev_started_excl2, ev_release_excl2))
676
    ev_started_excl2.wait()
677

  
678
    # Start shared acquires, will jump ahead of second exclusive acquire when
679
    # first exclusive acquire downgrades
680
    ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681
    ev_release_shared = threading.Event()
682

  
683
    th_shared = [self._addThread(target=_KeepShared,
684
                                 args=(ev_started, ev_got, ev_release_shared))
685
                 for (ev_started, ev_got) in ev_shared]
686

  
687
    # Wait for all shared acquires to start
688
    for (ev, _) in ev_shared:
689
      ev.wait()
690

  
691
    # Check lock information
692
    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693
                     (self.sl.name, "exclusive", [th_excl1.getName()], None))
694
    (_, _, _, pending) = self.sl.GetInfo(set([query.LQ_PENDING]))
695
    self.assertEqual([(pendmode, sorted(waiting))
696
                      for (pendmode, waiting) in pending],
697
                     [("exclusive", [th_excl2.getName()]),
698
                      ("shared", sorted(th.getName() for th in th_shared))])
699

  
700
    # Shared acquires won't start until the exclusive lock is downgraded
701
    ev_downgrade_excl1.set()
702

  
703
    # Wait for all shared acquires to be successful
704
    for (_, ev) in ev_shared:
705
      ev.wait()
706

  
707
    # Check lock information again
708
    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_PENDING])),
709
                     (self.sl.name, "shared", None,
710
                      [("exclusive", [th_excl2.getName()])]))
711
    (_, _, owner, _) = self.sl.GetInfo(set([query.LQ_OWNER]))
712
    self.assertEqual(set(owner), set([th_excl1.getName()] +
713
                                     [th.getName() for th in th_shared]))
714

  
715
    ev_release_excl1.set()
716
    ev_release_excl2.set()
717
    ev_release_shared.set()
718

  
719
    self._waitThreads()
720

  
721
    self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER,
722
                                          query.LQ_PENDING])),
723
                     (self.sl.name, None, None, []))
724

  
615 725
  @_Repeat
616 726
  def testMixedAcquireTimeout(self):
617 727
    sync = threading.Event()
......
1374 1484
    self.assertEqual(self.done.get_nowait(), 'DONE')
1375 1485
    self._setUpLS()
1376 1486

  
1487
  def testAcquireWithNamesDowngrade(self):
1488
    self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1489
    self.assertTrue(self.ls._is_owned())
1490
    self.assertFalse(self.ls._get_lock()._is_owned())
1491
    self.ls.release()
1492
    self.assertFalse(self.ls._is_owned())
1493
    self.assertFalse(self.ls._get_lock()._is_owned())
1494
    # Can't downgrade after releasing
1495
    self.assertRaises(AssertionError, self.ls.downgrade, "two")
1496

  
1497
  def testDowngrade(self):
1498
    # Not owning anything, must raise an exception
1499
    self.assertFalse(self.ls._is_owned())
1500
    self.assertRaises(AssertionError, self.ls.downgrade)
1501

  
1502
    self.assertFalse(compat.any(i._is_owned()
1503
                                for i in self.ls._get_lockdict().values()))
1504

  
1505
    self.assertEquals(self.ls.acquire(None, shared=0),
1506
                      set(["one", "two", "three"]))
1507
    self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1508

  
1509
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1510
    self.assertTrue(compat.all(i._is_owned(shared=0)
1511
                               for i in self.ls._get_lockdict().values()))
1512

  
1513
    # Start downgrading locks
1514
    self.assertTrue(self.ls.downgrade(names=["one"]))
1515
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1516
    self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")])
1517
                               for name, lock in
1518
                                 self.ls._get_lockdict().items()))
1519

  
1520
    self.assertTrue(self.ls.downgrade(names="two"))
1521
    self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1522
    should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1523
    self.assertTrue(compat.all(lock._is_owned(shared=should_share(name))
1524
                               for name, lock in
1525
                                 self.ls._get_lockdict().items()))
1526

  
1527
    # Downgrading the last exclusive lock to shared must downgrade the
1528
    # lockset-internal lock too
1529
    self.assertTrue(self.ls.downgrade(names="three"))
1530
    self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1531
    self.assertTrue(compat.all(i._is_owned(shared=1)
1532
                               for i in self.ls._get_lockdict().values()))
1533

  
1534
    # Downgrading a shared lock must be a no-op
1535
    self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1536
    self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1537
    self.assertTrue(compat.all(i._is_owned(shared=1)
1538
                               for i in self.ls._get_lockdict().values()))
1539

  
1540
    self.ls.release()
1541

  
1377 1542
  def testPriority(self):
1378 1543
    def _Acquire(prev, next, name, priority, success_fn):
1379 1544
      prev.wait()

Also available in: Unified diff