Revision 8d7d8b57 lib/locking.py

b/lib/locking.py
33 33
import logging
34 34
import heapq
35 35
import itertools
36
import time
36 37

  
37 38
from ganeti import errors
38 39
from ganeti import utils
......
404 405
    "__pending_by_prio",
405 406
    "__pending_shared",
406 407
    "__shr",
408
    "__time_fn",
407 409
    "name",
408 410
    ]
409 411

  
410 412
  __condition_class = _PipeConditionWithMode
411 413

  
412
  def __init__(self, name, monitor=None):
414
  def __init__(self, name, monitor=None, _time_fn=time.time):
413 415
    """Construct a new SharedLock.
414 416

  
415 417
    @param name: the name of the lock
......
421 423

  
422 424
    self.name = name
423 425

  
426
    # Used for unittesting
427
    self.__time_fn = _time_fn
428

  
424 429
    # Internal lock
425 430
    self.__lock = threading.Lock()
426 431

  
......
682 687
        assert priority not in self.__pending_shared
683 688
        self.__pending_shared[priority] = wait_condition
684 689

  
690
    wait_start = self.__time_fn()
691
    acquired = False
692

  
685 693
    try:
686 694
      # Wait until we become the topmost acquire in the queue or the timeout
687 695
      # expires.
688
      # TODO: Decrease timeout with spurious notifications
689
      while not (self.__is_on_top(wait_condition) and
690
                 self.__can_acquire(shared)):
691
        # Wait for notification
692
        wait_condition.wait(timeout)
693
        self.__check_deleted()
696
      while True:
697
        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
698
          self.__do_acquire(shared)
699
          acquired = True
700
          break
694 701

  
695
        # A lot of code assumes blocking acquires always succeed. Loop
696
        # internally for that case.
697
        if timeout is not None:
702
        # A lot of code assumes blocking acquires always succeed, therefore we
703
        # can never return False for a blocking acquire
704
        if (timeout is not None and
705
            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
698 706
          break
699 707

  
700
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
701
        self.__do_acquire(shared)
702
        return True
708
        # Wait for notification
709
        wait_condition.wait(timeout)
710
        self.__check_deleted()
703 711
    finally:
704 712
      # Remove condition from queue if there are no more waiters
705 713
      if not wait_condition.has_waiting():
......
709 717
          # (e.g. on lock deletion)
710 718
          self.__pending_shared.pop(priority, None)
711 719

  
712
    return False
720
    return acquired
713 721

  
714 722
  def acquire(self, shared=0, timeout=None, priority=None,
715 723
              test_notify=None):
......
800 808
        self.__shr.remove(threading.currentThread())
801 809

  
802 810
      # Notify topmost condition in queue
803
      (priority, prioqueue) = self.__find_first_pending_queue()
804
      if prioqueue:
805
        cond = prioqueue[0]
806
        cond.notifyAll()
807
        if cond.shared:
808
          # Prevent further shared acquires from sneaking in while waiters are
809
          # notified
810
          self.__pending_shared.pop(priority, None)
811
      self.__notify_topmost()
812
    finally:
813
      self.__lock.release()
814

  
815
  def __notify_topmost(self):
816
    """Notifies topmost condition in queue of pending acquires.
817

  
818
    """
819
    (priority, prioqueue) = self.__find_first_pending_queue()
820
    if prioqueue:
821
      cond = prioqueue[0]
822
      cond.notifyAll()
823
      if cond.shared:
824
        # Prevent further shared acquires from sneaking in while waiters are
825
        # notified
826
        self.__pending_shared.pop(priority, None)
827

  
828
  def _notify_topmost(self):
829
    """Exported version of L{__notify_topmost}.
811 830

  
831
    """
832
    self.__lock.acquire()
833
    try:
834
      return self.__notify_topmost()
812 835
    finally:
813 836
      self.__lock.release()
814 837

  

Also available in: Unified diff