Merge branch 'stable-2.6'
[ganeti-local] / lib / locking.py
index a1c78f4..ea044d5 100644 (file)
@@ -33,6 +33,7 @@ import weakref
 import logging
 import heapq
 import itertools
+import time
 
 from ganeti import errors
 from ganeti import utils
@@ -357,6 +358,11 @@ class PipeCondition(_BaseCondition):
 
     return bool(self._waiters)
 
+  def __repr__(self):
+    return ("<%s.%s waiters=%s at %#x>" %
+            (self.__class__.__module__, self.__class__.__name__,
+             self._waiters, id(self)))
+
 
 class _PipeConditionWithMode(PipeCondition):
   __slots__ = [
@@ -399,12 +405,13 @@ class SharedLock(object):
     "__pending_by_prio",
     "__pending_shared",
     "__shr",
+    "__time_fn",
     "name",
     ]
 
   __condition_class = _PipeConditionWithMode
 
-  def __init__(self, name, monitor=None):
+  def __init__(self, name, monitor=None, _time_fn=time.time):
     """Construct a new SharedLock.
 
     @param name: the name of the lock
@@ -416,6 +423,9 @@ class SharedLock(object):
 
     self.name = name
 
+    # Used for unittesting
+    self.__time_fn = _time_fn
+
     # Internal lock
     self.__lock = threading.Lock()
 
@@ -436,6 +446,11 @@ class SharedLock(object):
       logging.debug("Adding lock %s to monitor", name)
       monitor.RegisterLock(self)
 
+  def __repr__(self):
+    return ("<%s.%s name=%s at %#x>" %
+            (self.__class__.__module__, self.__class__.__name__,
+             self.name, id(self)))
+
   def GetLockInfo(self, requested):
     """Retrieves information for querying locks.
 
@@ -674,24 +689,27 @@ class SharedLock(object):
         assert priority not in self.__pending_shared
         self.__pending_shared[priority] = wait_condition
 
+    wait_start = self.__time_fn()
+    acquired = False
+
     try:
       # Wait until we become the topmost acquire in the queue or the timeout
       # expires.
-      # TODO: Decrease timeout with spurious notifications
-      while not (self.__is_on_top(wait_condition) and
-                 self.__can_acquire(shared)):
-        # Wait for notification
-        wait_condition.wait(timeout)
-        self.__check_deleted()
+      while True:
+        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
+          self.__do_acquire(shared)
+          acquired = True
+          break
 
-        # A lot of code assumes blocking acquires always succeed. Loop
-        # internally for that case.
-        if timeout is not None:
+        # A lot of code assumes blocking acquires always succeed, therefore we
+        # can never return False for a blocking acquire
+        if (timeout is not None and
+            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
           break
 
-      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
-        self.__do_acquire(shared)
-        return True
+        # Wait for notification
+        wait_condition.wait(timeout)
+        self.__check_deleted()
     finally:
       # Remove condition from queue if there are no more waiters
       if not wait_condition.has_waiting():
@@ -701,7 +719,7 @@ class SharedLock(object):
           # (e.g. on lock deletion)
           self.__pending_shared.pop(priority, None)
 
-    return False
+    return acquired
 
   def acquire(self, shared=0, timeout=None, priority=None,
               test_notify=None):
@@ -788,19 +806,38 @@ class SharedLock(object):
       # Autodetect release type
       if self.__is_exclusive():
         self.__exc = None
+        notify = True
       else:
         self.__shr.remove(threading.currentThread())
+        notify = not self.__shr
 
-      # Notify topmost condition in queue
-      (priority, prioqueue) = self.__find_first_pending_queue()
-      if prioqueue:
-        cond = prioqueue[0]
-        cond.notifyAll()
-        if cond.shared:
-          # Prevent further shared acquires from sneaking in while waiters are
-          # notified
-          self.__pending_shared.pop(priority, None)
+      # Notify topmost condition in queue if there are no owners left (for
+      # shared locks)
+      if notify:
+        self.__notify_topmost()
+    finally:
+      self.__lock.release()
+
+  def __notify_topmost(self):
+    """Notifies topmost condition in queue of pending acquires.
+
+    """
+    (priority, prioqueue) = self.__find_first_pending_queue()
+    if prioqueue:
+      cond = prioqueue[0]
+      cond.notifyAll()
+      if cond.shared:
+        # Prevent further shared acquires from sneaking in while waiters are
+        # notified
+        self.__pending_shared.pop(priority, None)
 
+  def _notify_topmost(self):
+    """Exported version of L{__notify_topmost}.
+
+    """
+    self.__lock.acquire()
+    try:
+      return self.__notify_topmost()
     finally:
       self.__lock.release()
 
@@ -832,10 +869,10 @@ class SharedLock(object):
       if not acquired:
         acquired = self.__acquire_unlocked(0, timeout, priority)
 
+      if acquired:
         assert self.__is_exclusive() and not self.__is_sharer(), \
           "Lock wasn't acquired in exclusive mode"
 
-      if acquired:
         self.__deleted = True
         self.__exc = None
 
@@ -943,9 +980,45 @@ class LockSet:
     return self.__lockdict
 
   def is_owned(self):
-    """Is the current thread a current level owner?"""
+    """Is the current thread a current level owner?
+
+    @note: Use L{check_owned} to check if a specific lock is held
+
+    """
     return threading.currentThread() in self.__owners
 
+  def check_owned(self, names, shared=-1):
+    """Check if locks are owned in a specific mode.
+
+    @type names: sequence or string
+    @param names: Lock names (or a single lock name)
+    @param shared: See L{SharedLock.is_owned}
+    @rtype: bool
+    @note: Use L{is_owned} to check if the current thread holds I{any} lock and
+      L{list_owned} to get the names of all owned locks
+
+    """
+    if isinstance(names, basestring):
+      names = [names]
+
+    # Avoid check if no locks are owned anyway
+    if names and self.is_owned():
+      candidates = []
+
+      # Gather references to all locks (in case they're deleted in the meantime)
+      for lname in names:
+        try:
+          lock = self.__lockdict[lname]
+        except KeyError:
+          raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
+                                 " have been removed)" % (lname, self.name))
+        else:
+          candidates.append(lock)
+
+      return compat.all(lock.is_owned(shared=shared) for lock in candidates)
+    else:
+      return False
+
   def _add_owned(self, name=None):
     """Note the current thread owns the given lock"""
     if name is None:
@@ -1419,7 +1492,7 @@ LEVEL_NAMES = {
   }
 
 # Constant for the big ganeti lock
-BGL = 'BGL'
+BGL = "BGL"
 
 
 class GanetiLockManager:
@@ -1481,14 +1554,6 @@ class GanetiLockManager:
     """
     return self._monitor.QueryLocks(fields)
 
-  def OldStyleQueryLocks(self, fields):
-    """Queries information from all locks, returning old-style data.
-
-    See L{LockMonitor.OldStyleQueryLocks}.
-
-    """
-    return self._monitor.OldStyleQueryLocks(fields)
-
   def _names(self, level):
     """List the lock names at the given level.
 
@@ -1512,6 +1577,14 @@ class GanetiLockManager:
     """
     return self.__keyring[level].list_owned()
 
+  def check_owned(self, level, names, shared=-1):
+    """Check if locks at a certain level are owned in a specific mode.
+
+    @see: L{LockSet.check_owned}
+
+    """
+    return self.__keyring[level].check_owned(names, shared=shared)
+
   def _upper_owned(self, level):
     """Check that we don't own any lock at a level greater than the given one.
 
@@ -1754,14 +1827,3 @@ class LockMonitor(object):
 
     # Prepare query response
     return query.GetQueryResponse(qobj, ctx)
-
-  def OldStyleQueryLocks(self, fields):
-    """Queries information from all locks, returning old-style data.
-
-    @type fields: list of strings
-    @param fields: List of fields to return
-
-    """
-    (qobj, ctx) = self._Query(fields)
-
-    return qobj.OldStyleQuery(ctx)