Merge branch 'devel-2.5'
[ganeti-local] / lib / locking.py
index ee8db70..ea044d5 100644 (file)
@@ -33,6 +33,7 @@ import weakref
 import logging
 import heapq
 import itertools
+import time
 
 from ganeti import errors
 from ganeti import utils
@@ -404,12 +405,13 @@ class SharedLock(object):
     "__pending_by_prio",
     "__pending_shared",
     "__shr",
+    "__time_fn",
     "name",
     ]
 
   __condition_class = _PipeConditionWithMode
 
-  def __init__(self, name, monitor=None):
+  def __init__(self, name, monitor=None, _time_fn=time.time):
     """Construct a new SharedLock.
 
     @param name: the name of the lock
@@ -421,6 +423,9 @@ class SharedLock(object):
 
     self.name = name
 
+    # Used for unittesting
+    self.__time_fn = _time_fn
+
     # Internal lock
     self.__lock = threading.Lock()
 
@@ -684,24 +689,27 @@ class SharedLock(object):
         assert priority not in self.__pending_shared
         self.__pending_shared[priority] = wait_condition
 
+    wait_start = self.__time_fn()
+    acquired = False
+
     try:
       # Wait until we become the topmost acquire in the queue or the timeout
       # expires.
-      # TODO: Decrease timeout with spurious notifications
-      while not (self.__is_on_top(wait_condition) and
-                 self.__can_acquire(shared)):
-        # Wait for notification
-        wait_condition.wait(timeout)
-        self.__check_deleted()
+      while True:
+        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
+          self.__do_acquire(shared)
+          acquired = True
+          break
 
-        # A lot of code assumes blocking acquires always succeed. Loop
-        # internally for that case.
-        if timeout is not None:
+        # A lot of code assumes blocking acquires always succeed, therefore we
+        # can never return False for a blocking acquire
+        if (timeout is not None and
+            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
           break
 
-      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
-        self.__do_acquire(shared)
-        return True
+        # Wait for notification
+        wait_condition.wait(timeout)
+        self.__check_deleted()
     finally:
       # Remove condition from queue if there are no more waiters
       if not wait_condition.has_waiting():
@@ -711,7 +719,7 @@ class SharedLock(object):
           # (e.g. on lock deletion)
           self.__pending_shared.pop(priority, None)
 
-    return False
+    return acquired
 
   def acquire(self, shared=0, timeout=None, priority=None,
               test_notify=None):
@@ -798,19 +806,38 @@ class SharedLock(object):
       # Autodetect release type
       if self.__is_exclusive():
         self.__exc = None
+        notify = True
       else:
         self.__shr.remove(threading.currentThread())
+        notify = not self.__shr
 
-      # Notify topmost condition in queue
-      (priority, prioqueue) = self.__find_first_pending_queue()
-      if prioqueue:
-        cond = prioqueue[0]
-        cond.notifyAll()
-        if cond.shared:
-          # Prevent further shared acquires from sneaking in while waiters are
-          # notified
-          self.__pending_shared.pop(priority, None)
+      # Notify topmost condition in queue if there are no owners left (for
+      # shared locks)
+      if notify:
+        self.__notify_topmost()
+    finally:
+      self.__lock.release()
+
+  def __notify_topmost(self):
+    """Notifies topmost condition in queue of pending acquires.
+
+    """
+    (priority, prioqueue) = self.__find_first_pending_queue()
+    if prioqueue:
+      cond = prioqueue[0]
+      cond.notifyAll()
+      if cond.shared:
+        # Prevent further shared acquires from sneaking in while waiters are
+        # notified
+        self.__pending_shared.pop(priority, None)
+
+  def _notify_topmost(self):
+    """Exported version of L{__notify_topmost}.
 
+    """
+    self.__lock.acquire()
+    try:
+      return self.__notify_topmost()
     finally:
       self.__lock.release()
 
@@ -842,10 +869,10 @@ class SharedLock(object):
       if not acquired:
         acquired = self.__acquire_unlocked(0, timeout, priority)
 
+      if acquired:
         assert self.__is_exclusive() and not self.__is_sharer(), \
           "Lock wasn't acquired in exclusive mode"
 
-      if acquired:
         self.__deleted = True
         self.__exc = None