Fix burnin error when trying to grow a file volume
[ganeti-local] / lib / locking.py
index adc7a23..aeafd3f 100644 (file)
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
-# Disable "Invalid name ..." message
-# pylint: disable-msg=C0103
-
 """Module implementing the Ganeti locking code."""
 
+# pylint: disable-msg=W0212
+
+# W0212 since e.g. LockSet methods use (a lot) the internals of
+# SharedLock
+
 import os
 import select
 import threading
@@ -52,6 +54,59 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
+class RunningTimeout(object):
+  """Class to calculate remaining timeout when doing several operations.
+
+  """
+  __slots__ = [
+    "_allow_negative",
+    "_start_time",
+    "_time_fn",
+    "_timeout",
+    ]
+
+  def __init__(self, timeout, allow_negative, _time_fn=time.time):
+    """Initializes this class.
+
+    @type timeout: float
+    @param timeout: Timeout duration
+    @type allow_negative: bool
+    @param allow_negative: Whether to return values below zero
+    @param _time_fn: Time function for unittests
+
+    """
+    object.__init__(self)
+
+    if timeout is not None and timeout < 0.0:
+      raise ValueError("Timeout must not be negative")
+
+    self._timeout = timeout
+    self._allow_negative = allow_negative
+    self._time_fn = _time_fn
+
+    self._start_time = None
+
+  def Remaining(self):
+    """Returns the remaining timeout.
+
+    """
+    if self._timeout is None:
+      return None
+
+    # Get start time on first calculation
+    if self._start_time is None:
+      self._start_time = self._time_fn()
+
+    # Calculate remaining time
+    remaining_timeout = self._start_time + self._timeout - self._time_fn()
+
+    if not self._allow_negative:
+      # Ensure timeout is always >= 0
+      return max(0.0, remaining_timeout)
+
+    return remaining_timeout
+
+
 class _SingleNotifyPipeConditionWaiter(object):
   """Helper class for SingleNotifyPipeCondition
 
@@ -81,10 +136,18 @@ class _SingleNotifyPipeConditionWaiter(object):
     @param timeout: Timeout for waiting (can be None)
 
     """
-    start_time = time.time()
-    remaining_time = timeout
+    running_timeout = RunningTimeout(timeout, True)
+
+    while True:
+      remaining_time = running_timeout.Remaining()
+
+      if remaining_time is not None:
+        if remaining_time < 0.0:
+          break
+
+        # Our calculation uses seconds, poll() wants milliseconds
+        remaining_time *= 1000
 
-    while timeout is None or remaining_time > 0:
       try:
         result = self._poller.poll(remaining_time)
       except EnvironmentError, err:
@@ -96,10 +159,6 @@ class _SingleNotifyPipeConditionWaiter(object):
       if result and result[0][0] == self._fd:
         break
 
-      # Re-calculate timeout if necessary
-      if timeout is not None:
-        remaining_time = start_time + timeout - time.time()
-
 
 class _BaseCondition(object):
   """Base class containing common code for conditions.
@@ -161,7 +220,7 @@ class SingleNotifyPipeCondition(_BaseCondition):
 
   """
 
-  __slots__ = _BaseCondition.__slots__ + [
+  __slots__ = [
     "_poller",
     "_read_fd",
     "_write_fd",
@@ -232,7 +291,7 @@ class SingleNotifyPipeCondition(_BaseCondition):
       if self._nwaiters == 0:
         self._Cleanup()
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Close the writing side of the pipe to notify all waiters.
 
     """
@@ -254,7 +313,7 @@ class PipeCondition(_BaseCondition):
   there are any waiting threads.
 
   """
-  __slots__ = _BaseCondition.__slots__ + [
+  __slots__ = [
     "_nwaiters",
     "_single_condition",
     ]
@@ -290,7 +349,7 @@ class PipeCondition(_BaseCondition):
       assert self._nwaiters > 0
       self._nwaiters -= 1
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
 
     """
@@ -327,7 +386,7 @@ class _CountingCondition(object):
     self._cond = threading.Condition(lock=lock)
     self._nwaiters = 0
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notifies the condition.
 
     """
@@ -545,7 +604,7 @@ class SharedLock(object):
   def acquire(self, shared=0, timeout=None, test_notify=None):
     """Acquire a shared lock.
 
-    @type shared: int
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float
@@ -658,6 +717,7 @@ class LockSet:
   def __init__(self, members=None):
     """Constructs a new LockSet.
 
+    @type members: list of strings
     @param members: initial members of the set
 
     """
@@ -699,6 +759,9 @@ class LockSet:
   def _del_owned(self, name=None):
     """Note the current thread owns the given lock"""
 
+    assert not (name is None and self.__lock._is_owned()), \
+           "Cannot hold internal lock when deleting owner status"
+
     if name is not None:
       self.__owners[threading.currentThread()].remove(name)
 
@@ -717,7 +780,9 @@ class LockSet:
   def _release_and_delete_owned(self):
     """Release and delete all resources owned by the current thread"""
     for lname in self._list_owned():
-      self.__lockdict[lname].release()
+      lock = self.__lockdict[lname]
+      if lock._is_owned():
+        lock.release()
       self._del_owned(name=lname)
 
   def __names(self):
@@ -751,8 +816,10 @@ class LockSet:
   def acquire(self, names, timeout=None, shared=0, test_notify=None):
     """Acquire a set of resource locks.
 
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float or None
@@ -774,24 +841,16 @@ class LockSet:
 
     # We need to keep track of how long we spent waiting for a lock. The
     # timeout passed to this function is over all lock acquires.
-    remaining_timeout = timeout
-    if timeout is None:
-      start = None
-      calc_remaining_timeout = lambda: None
-    else:
-      start = time.time()
-      calc_remaining_timeout = lambda: (start + timeout) - time.time()
+    running_timeout = RunningTimeout(timeout, False)
 
     try:
       if names is not None:
         # Support passing in a single resource to acquire rather than many
         if isinstance(names, basestring):
           names = [names]
-        else:
-          names = sorted(names)
 
         return self.__acquire_inner(names, False, shared,
-                                    calc_remaining_timeout, test_notify)
+                                    running_timeout.Remaining, test_notify)
 
       else:
         # If no names are given acquire the whole set by not letting new names
@@ -804,14 +863,14 @@ class LockSet:
         # anyway, though, so we'll get the list lock exclusively as well in
         # order to be able to do add() on the set while owning it.
         if not self.__lock.acquire(shared=shared,
-                                   timeout=calc_remaining_timeout()):
+                                   timeout=running_timeout.Remaining()):
           raise _AcquireTimeout()
         try:
           # note we own the set-lock
           self._add_owned()
 
           return self.__acquire_inner(self.__names(), True, shared,
-                                      calc_remaining_timeout, test_notify)
+                                      running_timeout.Remaining, test_notify)
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
@@ -824,18 +883,24 @@ class LockSet:
       return None
 
   def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
-    """
+    """Inner logic for acquiring a number of locks.
+
+    @param names: Names of the locks to be acquired
+    @param want_all: Whether all locks in the set should be acquired
+    @param shared: Whether to acquire in shared mode
+    @param timeout_fn: Function returning remaining timeout
+    @param test_notify: Special callback function for unittesting
 
     """
     acquire_list = []
 
     # First we look the locks up on __lockdict. We have no way of being sure
     # they will still be there after, but this makes it a lot faster should
-    # just one of them be the already wrong
-    for lname in utils.UniqueSequence(names):
+    # just one of them be the already wrong. Using a sorted sequence to prevent
+    # deadlocks.
+    for lname in sorted(utils.UniqueSequence(names)):
       try:
         lock = self.__lockdict[lname] # raises KeyError if lock is not there
-        acquire_list.append((lname, lock))
       except KeyError:
         if want_all:
           # We are acquiring all the set, it doesn't matter if this particular
@@ -844,6 +909,8 @@ class LockSet:
 
         raise errors.LockError("Non-existing lock in set (%s)" % lname)
 
+      acquire_list.append((lname, lock))
+
     # This will hold the locknames we effectively acquired.
     acquired = set()
 
@@ -860,8 +927,6 @@ class LockSet:
           test_notify_fn = None
 
         timeout = timeout_fn()
-        if timeout is not None and timeout < 0:
-          raise _AcquireTimeout()
 
         try:
           # raises LockError if the lock was deleted
@@ -910,6 +975,7 @@ class LockSet:
     You must have acquired the locks, either in shared or in exclusive mode,
     before releasing them.
 
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level).
 
@@ -943,8 +1009,11 @@ class LockSet:
   def add(self, names, acquired=0, shared=0):
     """Add a new set of elements to the set
 
+    @type names: list of strings
     @param names: names of the new elements to add
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: pre-acquire the new resource?
+    @type shared: integer (0/1) used as a boolean
     @param shared: is the pre-acquisition shared?
 
     """
@@ -1004,6 +1073,7 @@ class LockSet:
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
+    @type names: list of strings
     @param names: names of the resource to remove.
 
     @return: a list of locks which we removed; the list is always
@@ -1148,7 +1218,7 @@ class GanetiLockManager:
     # the test cases.
     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
 
-  def _BGL_owned(self):
+  def _BGL_owned(self): # pylint: disable-msg=C0103
     """Check if the current thread owns the BGL.
 
     Both an exclusive or a shared acquisition work.
@@ -1156,7 +1226,8 @@ class GanetiLockManager:
     """
     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
 
-  def _contains_BGL(self, level, names):
+  @staticmethod
+  def _contains_BGL(level, names): # pylint: disable-msg=C0103
     """Check if the level contains the BGL.
 
     Check if acting on the given level and set of names will change
@@ -1168,10 +1239,12 @@ class GanetiLockManager:
   def acquire(self, level, names, timeout=None, shared=0):
     """Acquire a set of resource locks, at the same level.
 
-    @param level: the level at which the locks shall be acquired;
-        it must be a member of LEVELS.
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be acquired
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
     @type timeout: float
@@ -1202,8 +1275,9 @@ class GanetiLockManager:
     You must have acquired the locks, either in shared or in exclusive
     mode, before releasing them.
 
-    @param level: the level at which the locks shall be released;
-        it must be a member of LEVELS
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be released
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level)
 
@@ -1212,7 +1286,9 @@ class GanetiLockManager:
     assert (not self._contains_BGL(level, names) or
             not self._upper_owned(LEVEL_CLUSTER)), (
             "Cannot release the Big Ganeti Lock while holding something"
-            " at upper levels")
+            " at upper levels (%r)" %
+            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
+                              for i in self.__keyring.keys()]), ))
 
     # Release will complain if we don't own the locks already
     return self.__keyring[level].release(names)
@@ -1220,10 +1296,13 @@ class GanetiLockManager:
   def add(self, level, names, acquired=0, shared=0):
     """Add locks at the specified level.
 
-    @param level: the level at which the locks shall be added;
-        it must be a member of LEVELS_MOD.
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be added
+    @type names: list of strings
     @param names: names of the locks to acquire
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: whether to acquire the newly added locks
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether the acquisition will be shared
 
     """
@@ -1240,8 +1319,9 @@ class GanetiLockManager:
     You must either already own the locks you are trying to remove
     exclusively or not own any lock at an upper level.
 
-    @param level: the level at which the locks shall be removed;
-        it must be a member of LEVELS_MOD
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be removed
+    @type names: list of strings
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)