Revision 8d7d8b57

b/lib/locking.py
33 33
import logging
34 34
import heapq
35 35
import itertools
36
import time
36 37

  
37 38
from ganeti import errors
38 39
from ganeti import utils
......
404 405
    "__pending_by_prio",
405 406
    "__pending_shared",
406 407
    "__shr",
408
    "__time_fn",
407 409
    "name",
408 410
    ]
409 411

  
410 412
  __condition_class = _PipeConditionWithMode
411 413

  
412
  def __init__(self, name, monitor=None):
414
  def __init__(self, name, monitor=None, _time_fn=time.time):
413 415
    """Construct a new SharedLock.
414 416

  
415 417
    @param name: the name of the lock
......
421 423

  
422 424
    self.name = name
423 425

  
426
    # Used for unittesting
427
    self.__time_fn = _time_fn
428

  
424 429
    # Internal lock
425 430
    self.__lock = threading.Lock()
426 431

  
......
682 687
        assert priority not in self.__pending_shared
683 688
        self.__pending_shared[priority] = wait_condition
684 689

  
690
    wait_start = self.__time_fn()
691
    acquired = False
692

  
685 693
    try:
686 694
      # Wait until we become the topmost acquire in the queue or the timeout
687 695
      # expires.
688
      # TODO: Decrease timeout with spurious notifications
689
      while not (self.__is_on_top(wait_condition) and
690
                 self.__can_acquire(shared)):
691
        # Wait for notification
692
        wait_condition.wait(timeout)
693
        self.__check_deleted()
696
      while True:
697
        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
698
          self.__do_acquire(shared)
699
          acquired = True
700
          break
694 701

  
695
        # A lot of code assumes blocking acquires always succeed. Loop
696
        # internally for that case.
697
        if timeout is not None:
702
        # A lot of code assumes blocking acquires always succeed, therefore we
703
        # can never return False for a blocking acquire
704
        if (timeout is not None and
705
            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
698 706
          break
699 707

  
700
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
701
        self.__do_acquire(shared)
702
        return True
708
        # Wait for notification
709
        wait_condition.wait(timeout)
710
        self.__check_deleted()
703 711
    finally:
704 712
      # Remove condition from queue if there are no more waiters
705 713
      if not wait_condition.has_waiting():
......
709 717
          # (e.g. on lock deletion)
710 718
          self.__pending_shared.pop(priority, None)
711 719

  
712
    return False
720
    return acquired
713 721

  
714 722
  def acquire(self, shared=0, timeout=None, priority=None,
715 723
              test_notify=None):
......
800 808
        self.__shr.remove(threading.currentThread())
801 809

  
802 810
      # Notify topmost condition in queue
803
      (priority, prioqueue) = self.__find_first_pending_queue()
804
      if prioqueue:
805
        cond = prioqueue[0]
806
        cond.notifyAll()
807
        if cond.shared:
808
          # Prevent further shared acquires from sneaking in while waiters are
809
          # notified
810
          self.__pending_shared.pop(priority, None)
811
      self.__notify_topmost()
812
    finally:
813
      self.__lock.release()
814

  
815
  def __notify_topmost(self):
816
    """Notifies topmost condition in queue of pending acquires.
817

  
818
    """
819
    (priority, prioqueue) = self.__find_first_pending_queue()
820
    if prioqueue:
821
      cond = prioqueue[0]
822
      cond.notifyAll()
823
      if cond.shared:
824
        # Prevent further shared acquires from sneaking in while waiters are
825
        # notified
826
        self.__pending_shared.pop(priority, None)
827

  
828
  def _notify_topmost(self):
829
    """Exported version of L{__notify_topmost}.
811 830

  
831
    """
832
    self.__lock.acquire()
833
    try:
834
      return self.__notify_topmost()
812 835
    finally:
813 836
      self.__lock.release()
814 837

  
b/test/ganeti.locking_unittest.py
953 953
                                       for i in sorted(perprio.keys())]
954 954
                      for (shared, _, threads) in acquires])
955 955

  
956
  class _FakeTimeForSpuriousNotifications:
957
    def __init__(self, now, check_end):
958
      self.now = now
959
      self.check_end = check_end
960

  
961
      # Deterministic random number generator
962
      self.rnd = random.Random(15086)
963

  
964
    def time(self):
965
      # Advance time if the random number generator thinks so (this is to test
966
      # multiple notifications without advancing the time)
967
      if self.rnd.random() < 0.3:
968
        self.now += self.rnd.random()
969

  
970
      self.check_end(self.now)
971

  
972
      return self.now
973

  
974
  @_Repeat
975
  def testAcquireTimeoutWithSpuriousNotifications(self):
976
    ready = threading.Event()
977
    locked = threading.Event()
978
    req = Queue.Queue(0)
979

  
980
    epoch = 4000.0
981
    timeout = 60.0
982

  
983
    def check_end(now):
984
      self.assertFalse(locked.isSet())
985

  
986
      # If we waited long enough (in virtual time), tell main thread to release
987
      # lock, otherwise tell it to notify once more
988
      req.put(now < (epoch + (timeout * 0.8)))
989

  
990
    time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
991

  
992
    sl = locking.SharedLock("test", _time_fn=time_fn)
993

  
994
    # Acquire in exclusive mode
995
    sl.acquire(shared=0)
996

  
997
    def fn():
998
      self.assertTrue(sl.acquire(shared=0, timeout=timeout,
999
                                 test_notify=ready.set))
1000
      locked.set()
1001
      sl.release()
1002
      self.done.put("success")
1003

  
1004
    # Start acquire with timeout and wait for it to be ready
1005
    self._addThread(target=fn)
1006
    ready.wait()
1007

  
1008
    # The separate thread is now waiting to acquire the lock, so start sending
1009
    # spurious notifications.
1010

  
1011
    # Wait for separate thread to ask for another notification
1012
    count = 0
1013
    while req.get():
1014
      # After sending the notification, the lock will take a short amount of
1015
      # time to notice and to retrieve the current time
1016
      sl._notify_topmost()
1017
      count += 1
1018

  
1019
    self.assertTrue(count > 100, "Not enough notifications were sent")
1020

  
1021
    self.assertFalse(locked.isSet())
1022

  
1023
    # Some notifications have been sent, now actually release the lock
1024
    sl.release()
1025

  
1026
    # Wait for lock to be acquired
1027
    locked.wait()
1028

  
1029
    self._waitThreads()
1030

  
1031
    self.assertEqual(self.done.get_nowait(), "success")
1032
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1033

  
956 1034

  
957 1035
class TestSharedLockInCondition(_ThreadedTestCase):
958 1036
  """SharedLock as a condition lock tests"""

Also available in: Unified diff