Reuse backend parameters from export
[ganeti-local] / lib / locking.py
index 382a45c..aeafd3f 100644 (file)
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
-# Disable "Invalid name ..." message
-# pylint: disable-msg=C0103
-
 """Module implementing the Ganeti locking code."""
 
+# pylint: disable-msg=W0212
+
+# W0212 since e.g. LockSet methods use (a lot) the internals of
+# SharedLock
+
 import os
 import select
 import threading
@@ -52,6 +54,59 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
+class RunningTimeout(object):
+  """Class to calculate remaining timeout when doing several operations.
+
+  """
+  __slots__ = [
+    "_allow_negative",
+    "_start_time",
+    "_time_fn",
+    "_timeout",
+    ]
+
+  def __init__(self, timeout, allow_negative, _time_fn=time.time):
+    """Initializes this class.
+
+    @type timeout: float
+    @param timeout: Timeout duration
+    @type allow_negative: bool
+    @param allow_negative: Whether to return values below zero
+    @param _time_fn: Time function for unittests
+
+    """
+    object.__init__(self)
+
+    if timeout is not None and timeout < 0.0:
+      raise ValueError("Timeout must not be negative")
+
+    self._timeout = timeout
+    self._allow_negative = allow_negative
+    self._time_fn = _time_fn
+
+    self._start_time = None
+
+  def Remaining(self):
+    """Returns the remaining timeout.
+
+    """
+    if self._timeout is None:
+      return None
+
+    # Get start time on first calculation
+    if self._start_time is None:
+      self._start_time = self._time_fn()
+
+    # Calculate remaining time
+    remaining_timeout = self._start_time + self._timeout - self._time_fn()
+
+    if not self._allow_negative:
+      # Ensure timeout is always >= 0
+      return max(0.0, remaining_timeout)
+
+    return remaining_timeout
+
+
 class _SingleNotifyPipeConditionWaiter(object):
   """Helper class for SingleNotifyPipeCondition
 
@@ -81,10 +136,18 @@ class _SingleNotifyPipeConditionWaiter(object):
     @param timeout: Timeout for waiting (can be None)
 
     """
-    start_time = time.time()
-    remaining_time = timeout
+    running_timeout = RunningTimeout(timeout, True)
+
+    while True:
+      remaining_time = running_timeout.Remaining()
+
+      if remaining_time is not None:
+        if remaining_time < 0.0:
+          break
+
+        # Our calculation uses seconds, poll() wants milliseconds
+        remaining_time *= 1000
 
-    while timeout is None or remaining_time > 0:
       try:
         result = self._poller.poll(remaining_time)
       except EnvironmentError, err:
@@ -96,10 +159,6 @@ class _SingleNotifyPipeConditionWaiter(object):
       if result and result[0][0] == self._fd:
         break
 
-      # Re-calculate timeout if necessary
-      if timeout is not None:
-        remaining_time = start_time + timeout - time.time()
-
 
 class _BaseCondition(object):
   """Base class containing common code for conditions.
@@ -161,7 +220,7 @@ class SingleNotifyPipeCondition(_BaseCondition):
 
   """
 
-  __slots__ = _BaseCondition.__slots__ + [
+  __slots__ = [
     "_poller",
     "_read_fd",
     "_write_fd",
@@ -232,7 +291,7 @@ class SingleNotifyPipeCondition(_BaseCondition):
       if self._nwaiters == 0:
         self._Cleanup()
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Close the writing side of the pipe to notify all waiters.
 
     """
@@ -254,7 +313,7 @@ class PipeCondition(_BaseCondition):
   there are any waiting threads.
 
   """
-  __slots__ = _BaseCondition.__slots__ + [
+  __slots__ = [
     "_nwaiters",
     "_single_condition",
     ]
@@ -290,7 +349,7 @@ class PipeCondition(_BaseCondition):
       assert self._nwaiters > 0
       self._nwaiters -= 1
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
 
     """
@@ -327,7 +386,7 @@ class _CountingCondition(object):
     self._cond = threading.Condition(lock=lock)
     self._nwaiters = 0
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notifies the condition.
 
     """
@@ -545,7 +604,7 @@ class SharedLock(object):
   def acquire(self, shared=0, timeout=None, test_notify=None):
     """Acquire a shared lock.
 
-    @type shared: int
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float
@@ -658,6 +717,7 @@ class LockSet:
   def __init__(self, members=None):
     """Constructs a new LockSet.
 
+    @type members: list of strings
     @param members: initial members of the set
 
     """
@@ -699,6 +759,9 @@ class LockSet:
   def _del_owned(self, name=None):
     """Note the current thread owns the given lock"""
 
+    assert not (name is None and self.__lock._is_owned()), \
+           "Cannot hold internal lock when deleting owner status"
+
     if name is not None:
       self.__owners[threading.currentThread()].remove(name)
 
@@ -717,7 +780,9 @@ class LockSet:
   def _release_and_delete_owned(self):
     """Release and delete all resources owned by the current thread"""
     for lname in self._list_owned():
-      self.__lockdict[lname].release()
+      lock = self.__lockdict[lname]
+      if lock._is_owned():
+        lock.release()
       self._del_owned(name=lname)
 
   def __names(self):
@@ -751,8 +816,10 @@ class LockSet:
   def acquire(self, names, timeout=None, shared=0, test_notify=None):
     """Acquire a set of resource locks.
 
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float or None
@@ -774,135 +841,131 @@ class LockSet:
 
     # We need to keep track of how long we spent waiting for a lock. The
     # timeout passed to this function is over all lock acquires.
-    remaining_timeout = timeout
-    if timeout is None:
-      start = None
-      calc_remaining_timeout = lambda: None
-    else:
-      start = time.time()
-      calc_remaining_timeout = lambda: (start + timeout) - time.time()
-
-    want_all = names is None
-
-    if want_all:
-      # If no names are given acquire the whole set by not letting new names
-      # being added before we release, and getting the current list of names.
-      # Some of them may then be deleted later, but we'll cope with this.
-      #
-      # We'd like to acquire this lock in a shared way, as it's nice if
-      # everybody else can use the instances at the same time. If are acquiring
-      # them exclusively though they won't be able to do this anyway, though,
-      # so we'll get the list lock exclusively as well in order to be able to
-      # do add() on the set while owning it.
-      self.__lock.acquire(shared=shared, timeout=remaining_timeout)
-      try:
-        # note we own the set-lock
-        self._add_owned()
-        names = self.__names()
-      except:
-        # We shouldn't have problems adding the lock to the owners list, but
-        # if we did we'll try to release this lock and re-raise exception.
-        # Of course something is going to be really wrong, after this.
-        self.__lock.release()
-        raise
-
-      # Re-calculate timeout
-      remaining_timeout = calc_remaining_timeout()
+    running_timeout = RunningTimeout(timeout, False)
 
     try:
-      try:
+      if names is not None:
         # Support passing in a single resource to acquire rather than many
         if isinstance(names, basestring):
           names = [names]
-        else:
-          names = sorted(names)
 
-        acquire_list = []
-        # First we look the locks up on __lockdict. We have no way of being sure
-        # they will still be there after, but this makes it a lot faster should
-        # just one of them be the already wrong
-        for lname in utils.UniqueSequence(names):
-          try:
-            lock = self.__lockdict[lname] # raises KeyError if lock is not there
-            acquire_list.append((lname, lock))
-          except KeyError:
-            if want_all:
-              # We are acquiring all the set, it doesn't matter if this
-              # particular element is not there anymore.
-              continue
-            else:
-              raise errors.LockError("Non-existing lock in set (%s)" % lname)
-
-        # This will hold the locknames we effectively acquired.
-        acquired = set()
-
-        # Now acquire_list contains a sorted list of resources and locks we
-        # want.  In order to get them we loop on this (private) list and
-        # acquire() them.  We gave no real guarantee they will still exist till
-        # this is done but .acquire() itself is safe and will alert us if the
-        # lock gets deleted.
-        for (lname, lock) in acquire_list:
-          if __debug__ and callable(test_notify):
-            test_notify_fn = lambda: test_notify(lname)
-          else:
-            test_notify_fn = None
+        return self.__acquire_inner(names, False, shared,
+                                    running_timeout.Remaining, test_notify)
 
-          try:
-            if timeout is not None and remaining_timeout < 0:
-              raise _AcquireTimeout()
-
-            # raises LockError if the lock was deleted
-            if not lock.acquire(shared=shared, timeout=remaining_timeout,
-                                test_notify=test_notify_fn):
-              # Couldn't get lock or timeout occurred
-              if timeout is None:
-                # This shouldn't happen as SharedLock.acquire(timeout=None) is
-                # blocking.
-                raise errors.LockError("Failed to get lock %s" % lname)
-
-              raise _AcquireTimeout()
-
-            # Re-calculate timeout
-            remaining_timeout = calc_remaining_timeout()
-
-            # now the lock cannot be deleted, we have it!
-            self._add_owned(name=lname)
-            acquired.add(lname)
-
-          except _AcquireTimeout:
-            # Release all acquired locks
-            self._release_and_delete_owned()
-            raise
+      else:
+        # If no names are given acquire the whole set by not letting new names
+        # being added before we release, and getting the current list of names.
+        # Some of them may then be deleted later, but we'll cope with this.
+        #
+        # We'd like to acquire this lock in a shared way, as it's nice if
+        # everybody else can use the instances at the same time. If are
+        # acquiring them exclusively though they won't be able to do this
+        # anyway, though, so we'll get the list lock exclusively as well in
+        # order to be able to do add() on the set while owning it.
+        if not self.__lock.acquire(shared=shared,
+                                   timeout=running_timeout.Remaining()):
+          raise _AcquireTimeout()
+        try:
+          # note we own the set-lock
+          self._add_owned()
+
+          return self.__acquire_inner(self.__names(), True, shared,
+                                      running_timeout.Remaining, test_notify)
+        except:
+          # We shouldn't have problems adding the lock to the owners list, but
+          # if we did we'll try to release this lock and re-raise exception.
+          # Of course something is going to be really wrong, after this.
+          self.__lock.release()
+          self._del_owned()
+          raise
 
-          except errors.LockError:
-            if want_all:
-              # We are acquiring all the set, it doesn't matter if this
-              # particular element is not there anymore.
-              continue
+    except _AcquireTimeout:
+      return None
 
-            self._release_and_delete_owned()
+  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
+    """Inner logic for acquiring a number of locks.
 
-            raise errors.LockError("Non-existing lock in set (%s)" % lname)
+    @param names: Names of the locks to be acquired
+    @param want_all: Whether all locks in the set should be acquired
+    @param shared: Whether to acquire in shared mode
+    @param timeout_fn: Function returning remaining timeout
+    @param test_notify: Special callback function for unittesting
 
-          except:
-            # We shouldn't have problems adding the lock to the owners list, but
-            # if we did we'll try to release this lock and re-raise exception.
-            # Of course something is going to be really wrong, after this.
-            if lock._is_owned():
-              lock.release()
-            raise
+    """
+    acquire_list = []
 
-      except:
-        # If something went wrong and we had the set-lock let's release it...
+    # First we look the locks up on __lockdict. We have no way of being sure
+    # they will still be there after, but this makes it a lot faster should
+    # just one of them be the already wrong. Using a sorted sequence to prevent
+    # deadlocks.
+    for lname in sorted(utils.UniqueSequence(names)):
+      try:
+        lock = self.__lockdict[lname] # raises KeyError if lock is not there
+      except KeyError:
         if want_all:
-          self.__lock.release()
-        raise
+          # We are acquiring all the set, it doesn't matter if this particular
+          # element is not there anymore.
+          continue
 
-    except _AcquireTimeout:
-      if want_all:
-        self._del_owned()
+        raise errors.LockError("Non-existing lock in set (%s)" % lname)
 
-      return None
+      acquire_list.append((lname, lock))
+
+    # This will hold the locknames we effectively acquired.
+    acquired = set()
+
+    try:
+      # Now acquire_list contains a sorted list of resources and locks we
+      # want.  In order to get them we loop on this (private) list and
+      # acquire() them.  We gave no real guarantee they will still exist till
+      # this is done but .acquire() itself is safe and will alert us if the
+      # lock gets deleted.
+      for (lname, lock) in acquire_list:
+        if __debug__ and callable(test_notify):
+          test_notify_fn = lambda: test_notify(lname)
+        else:
+          test_notify_fn = None
+
+        timeout = timeout_fn()
+
+        try:
+          # raises LockError if the lock was deleted
+          acq_success = lock.acquire(shared=shared, timeout=timeout,
+                                     test_notify=test_notify_fn)
+        except errors.LockError:
+          if want_all:
+            # We are acquiring all the set, it doesn't matter if this
+            # particular element is not there anymore.
+            continue
+
+          raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+        if not acq_success:
+          # Couldn't get lock or timeout occurred
+          if timeout is None:
+            # This shouldn't happen as SharedLock.acquire(timeout=None) is
+            # blocking.
+            raise errors.LockError("Failed to get lock %s" % lname)
+
+          raise _AcquireTimeout()
+
+        try:
+          # now the lock cannot be deleted, we have it!
+          self._add_owned(name=lname)
+          acquired.add(lname)
+
+        except:
+          # We shouldn't have problems adding the lock to the owners list, but
+          # if we did we'll try to release this lock and re-raise exception.
+          # Of course something is going to be really wrong after this.
+          if lock._is_owned():
+            lock.release()
+          raise
+
+    except:
+      # Release all owned locks
+      self._release_and_delete_owned()
+      raise
 
     return acquired
 
@@ -912,6 +975,7 @@ class LockSet:
     You must have acquired the locks, either in shared or in exclusive mode,
     before releasing them.
 
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level).
 
@@ -945,8 +1009,11 @@ class LockSet:
   def add(self, names, acquired=0, shared=0):
     """Add a new set of elements to the set
 
+    @type names: list of strings
     @param names: names of the new elements to add
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: pre-acquire the new resource?
+    @type shared: integer (0/1) used as a boolean
     @param shared: is the pre-acquisition shared?
 
     """
@@ -1006,6 +1073,7 @@ class LockSet:
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
+    @type names: list of strings
     @param names: names of the resource to remove.
 
     @return: a list of locks which we removed; the list is always
@@ -1150,7 +1218,7 @@ class GanetiLockManager:
     # the test cases.
     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
 
-  def _BGL_owned(self):
+  def _BGL_owned(self): # pylint: disable-msg=C0103
     """Check if the current thread owns the BGL.
 
     Both an exclusive or a shared acquisition work.
@@ -1158,7 +1226,8 @@ class GanetiLockManager:
     """
     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
 
-  def _contains_BGL(self, level, names):
+  @staticmethod
+  def _contains_BGL(level, names): # pylint: disable-msg=C0103
     """Check if the level contains the BGL.
 
     Check if acting on the given level and set of names will change
@@ -1170,10 +1239,12 @@ class GanetiLockManager:
   def acquire(self, level, names, timeout=None, shared=0):
     """Acquire a set of resource locks, at the same level.
 
-    @param level: the level at which the locks shall be acquired;
-        it must be a member of LEVELS.
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be acquired
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
     @type timeout: float
@@ -1204,8 +1275,9 @@ class GanetiLockManager:
     You must have acquired the locks, either in shared or in exclusive
     mode, before releasing them.
 
-    @param level: the level at which the locks shall be released;
-        it must be a member of LEVELS
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be released
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level)
 
@@ -1214,7 +1286,9 @@ class GanetiLockManager:
     assert (not self._contains_BGL(level, names) or
             not self._upper_owned(LEVEL_CLUSTER)), (
             "Cannot release the Big Ganeti Lock while holding something"
-            " at upper levels")
+            " at upper levels (%r)" %
+            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
+                              for i in self.__keyring.keys()]), ))
 
     # Release will complain if we don't own the locks already
     return self.__keyring[level].release(names)
@@ -1222,10 +1296,13 @@ class GanetiLockManager:
   def add(self, level, names, acquired=0, shared=0):
     """Add locks at the specified level.
 
-    @param level: the level at which the locks shall be added;
-        it must be a member of LEVELS_MOD.
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be added
+    @type names: list of strings
     @param names: names of the locks to acquire
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: whether to acquire the newly added locks
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether the acquisition will be shared
 
     """
@@ -1242,8 +1319,9 @@ class GanetiLockManager:
     You must either already own the locks you are trying to remove
     exclusively or not own any lock at an upper level.
 
-    @param level: the level at which the locks shall be removed;
-        it must be a member of LEVELS_MOD
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be removed
+    @type names: list of strings
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)