Code and docstring style fixes
[ganeti-local] / lib / locking.py
index 901fdac..382a45c 100644 (file)
@@ -18,6 +18,9 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
+# Disable "Invalid name ..." message
+# pylint: disable-msg=C0103
+
 """Module implementing the Ganeti locking code."""
 
 import os
@@ -49,21 +52,18 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
-class _SingleActionPipeConditionWaiter(object):
-  """Callable helper class for _SingleActionPipeCondition.
+class _SingleNotifyPipeConditionWaiter(object):
+  """Helper class for SingleNotifyPipeCondition
 
   """
   __slots__ = [
-    "_cond",
     "_fd",
     "_poller",
     ]
 
-  def __init__(self, cond, poller, fd):
-    """Initializes this class.
+  def __init__(self, poller, fd):
+    """Constructor for _SingleNotifyPipeConditionWaiter
 
-    @type cond: L{_SingleActionPipeCondition}
-    @param cond: Parent condition
     @type poller: select.poll
     @param poller: Poller object
     @type fd: int
@@ -71,8 +71,6 @@ class _SingleActionPipeConditionWaiter(object):
 
     """
     object.__init__(self)
-
-    self._cond = cond
     self._poller = poller
     self._fd = fd
 
@@ -103,105 +101,96 @@ class _SingleActionPipeConditionWaiter(object):
         remaining_time = start_time + timeout - time.time()
 
 
-class _SingleActionPipeCondition(object):
-  """Wrapper around a pipe for usage inside conditions.
-
-  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
-  always allocated when constructing the class. Extra care is taken to always
-  close the file descriptors.
+class _BaseCondition(object):
+  """Base class containing common code for conditions.
 
-  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
-  notifications.
-
-  Warning: This class is designed to be used as the underlying component of a
-  locking condition, but is not by itself thread safe, and needs to be
-  protected by an external lock.
+  Some of this code is taken from python's threading module.
 
   """
   __slots__ = [
-    "_poller",
-    "_read_fd",
-    "_write_fd",
-    "_nwaiters",
+    "_lock",
+    "acquire",
+    "release",
     ]
 
-  _waiter_class = _SingleActionPipeConditionWaiter
+  def __init__(self, lock):
+    """Constructor for _BaseCondition.
 
-  def __init__(self):
-    """Initializes this class.
+    @type lock: threading.Lock
+    @param lock: condition base lock
 
     """
     object.__init__(self)
 
-    self._nwaiters = 0
-
-    # Just assume the unpacking is successful, otherwise error handling gets
-    # very complicated.
-    (self._read_fd, self._write_fd) = os.pipe()
-    try:
-      # The poller looks for closure of the write side
-      poller = select.poll()
-      poller.register(self._read_fd, select.POLLHUP)
-
-      self._poller = poller
-    except:
-      if self._read_fd is not None:
-        os.close(self._read_fd)
-      if self._write_fd is not None:
-        os.close(self._write_fd)
-      raise
+    # Recursive locks are not supported
+    assert not hasattr(lock, "_acquire_restore")
+    assert not hasattr(lock, "_release_save")
 
-    # There should be no code here anymore, otherwise the pipe file descriptors
-    # may be not be cleaned up properly in case of errors.
+    self._lock = lock
 
-  def StartWaiting(self):
-    """Return function to wait for notification.
+    # Export the lock's acquire() and release() methods
+    self.acquire = lock.acquire
+    self.release = lock.release
 
-    @rtype: L{_SingleActionPipeConditionWaiter}
-    @return: Function to wait for notification
+  def _is_owned(self):
+    """Check whether lock is owned by current thread.
 
     """
-    assert self._nwaiters >= 0
+    if self._lock.acquire(0):
+      self._lock.release()
+      return False
 
-    if self._poller is None:
-      raise RuntimeError("Already cleaned up")
+    return True
 
-    # Create waiter function and increase number of waiters
-    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
-    self._nwaiters += 1
-    return wait_fn
+  def _check_owned(self):
+    """Raise an exception if the current thread doesn't own the lock.
 
-  def DoneWaiting(self):
-    """Decrement number of waiters and automatic cleanup.
+    """
+    if not self._is_owned():
+      raise RuntimeError("cannot work with un-aquired lock")
 
-    Must be called after waiting for a notification.
 
-    @rtype: bool
-    @return: Whether this was the last waiter
+class SingleNotifyPipeCondition(_BaseCondition):
+  """Condition which can only be notified once.
 
-    """
-    assert self._nwaiters > 0
+  This condition class uses pipes and poll, internally, to be able to wait for
+  notification with a timeout, without resorting to polling. It is almost
+  compatible with Python's threading.Condition, with the following differences:
+    - notifyAll can only be called once, and no wait can happen after that
+    - notify is not supported, only notifyAll
 
-    self._nwaiters -= 1
+  """
 
-    if self._nwaiters == 0:
-      self._Cleanup()
-      return True
+  __slots__ = _BaseCondition.__slots__ + [
+    "_poller",
+    "_read_fd",
+    "_write_fd",
+    "_nwaiters",
+    "_notified",
+    ]
 
-    return False
+  _waiter_class = _SingleNotifyPipeConditionWaiter
 
-  def notifyAll(self):
-    """Close the writing side of the pipe to notify all waiters.
+  def __init__(self, lock):
+    """Constructor for SingleNotifyPipeCondition
 
     """
-    if self._write_fd is None:
-      raise RuntimeError("Can only notify once")
-
-    os.close(self._write_fd)
+    _BaseCondition.__init__(self, lock)
+    self._nwaiters = 0
+    self._notified = False
+    self._read_fd = None
     self._write_fd = None
+    self._poller = None
+
+  def _check_unnotified(self):
+    """Throws an exception if already notified.
+
+    """
+    if self._notified:
+      raise RuntimeError("cannot use already notified condition")
 
   def _Cleanup(self):
-    """Close all file descriptors.
+    """Cleanup open file descriptors, if any.
 
     """
     if self._read_fd is not None:
@@ -211,19 +200,51 @@ class _SingleActionPipeCondition(object):
     if self._write_fd is not None:
       os.close(self._write_fd)
       self._write_fd = None
-
     self._poller = None
 
-  def __del__(self):
-    """Called on object deletion.
+  def wait(self, timeout=None):
+    """Wait for a notification.
+
+    @type timeout: float or None
+    @param timeout: Waiting timeout (can be None)
+
+    """
+    self._check_owned()
+    self._check_unnotified()
 
-    Ensure no file descriptors are left open.
+    self._nwaiters += 1
+    try:
+      if self._poller is None:
+        (self._read_fd, self._write_fd) = os.pipe()
+        self._poller = select.poll()
+        self._poller.register(self._read_fd, select.POLLHUP)
+
+      wait_fn = self._waiter_class(self._poller, self._read_fd)
+      self.release()
+      try:
+        # Wait for notification
+        wait_fn(timeout)
+      finally:
+        # Re-acquire lock
+        self.acquire()
+    finally:
+      self._nwaiters -= 1
+      if self._nwaiters == 0:
+        self._Cleanup()
+
+  def notifyAll(self):
+    """Close the writing side of the pipe to notify all waiters.
 
     """
-    self._Cleanup()
+    self._check_owned()
+    self._check_unnotified()
+    self._notified = True
+    if self._write_fd is not None:
+      os.close(self._write_fd)
+      self._write_fd = None
 
 
-class _PipeCondition(object):
+class PipeCondition(_BaseCondition):
   """Group-only non-polling condition with counters.
 
   This condition class uses pipes and poll, internally, to be able to wait for
@@ -233,51 +254,20 @@ class _PipeCondition(object):
   there are any waiting threads.
 
   """
-  __slots__ = [
-    "_lock",
+  __slots__ = _BaseCondition.__slots__ + [
     "_nwaiters",
-    "_pipe",
-    "acquire",
-    "release",
+    "_single_condition",
     ]
 
-  _pipe_class = _SingleActionPipeCondition
+  _single_condition_class = SingleNotifyPipeCondition
 
   def __init__(self, lock):
     """Initializes this class.
 
     """
-    object.__init__(self)
-
-    # Recursive locks are not supported
-    assert not hasattr(lock, "_acquire_restore")
-    assert not hasattr(lock, "_release_save")
-
-    self._lock = lock
-
-    # Export the lock's acquire() and release() methods
-    self.acquire = lock.acquire
-    self.release = lock.release
-
+    _BaseCondition.__init__(self, lock)
     self._nwaiters = 0
-    self._pipe = None
-
-  def _is_owned(self):
-    """Check whether lock is owned by current thread.
-
-    """
-    if self._lock.acquire(0):
-      self._lock.release()
-      return False
-
-    return True
-
-  def _check_owned(self):
-    """Raise an exception if the current thread doesn't own the lock.
-
-    """
-    if not self._is_owned():
-      raise RuntimeError("cannot work with un-aquired lock")
+    self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
     """Wait for a notification.
@@ -288,32 +278,14 @@ class _PipeCondition(object):
     """
     self._check_owned()
 
-    if not self._pipe:
-      self._pipe = self._pipe_class()
-
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
-    pipe = self._pipe
+    my_condition = self._single_condition
 
     assert self._nwaiters >= 0
     self._nwaiters += 1
     try:
-      # Get function to wait on the pipe
-      wait_fn = pipe.StartWaiting()
-      try:
-        # Release lock while waiting
-        self.release()
-        try:
-          # Wait for notification
-          wait_fn(timeout)
-        finally:
-          # Re-acquire lock
-          self.acquire()
-      finally:
-        # Destroy pipe if this was the last waiter and the current pipe is
-        # still the same. The same pipe cannot be reused after cleanup.
-        if pipe.DoneWaiting() and pipe == self._pipe:
-          self._pipe = None
+      my_condition.wait(timeout)
     finally:
       assert self._nwaiters > 0
       self._nwaiters -= 1
@@ -323,12 +295,8 @@ class _PipeCondition(object):
 
     """
     self._check_owned()
-
-    # Notify and forget pipe. A new one will be created on the next call to
-    # wait.
-    if self._pipe is not None:
-      self._pipe.notifyAll()
-      self._pipe = None
+    self._single_condition.notifyAll()
+    self._single_condition = self._single_condition_class(self._lock)
 
   def has_waiting(self):
     """Returns whether there are active waiters.
@@ -369,7 +337,7 @@ class _CountingCondition(object):
     """Waits for the condition to be notified.
 
     @type timeout: float or None
-    @param timeout: Timeout in seconds
+    @param timeout: Waiting timeout (can be None)
 
     """
     assert self._nwaiters >= 0
@@ -409,7 +377,7 @@ class SharedLock(object):
     "__shr",
     ]
 
-  __condition_class = _CountingCondition
+  __condition_class = PipeCondition
 
   def __init__(self):
     """Construct a new SharedLock.
@@ -520,7 +488,7 @@ class SharedLock(object):
     """
     return self.__pending[0] == cond
 
-  def __acquire_unlocked(self, shared=0, timeout=None):
+  def __acquire_unlocked(self, shared, timeout):
     """Acquire a shared lock.
 
     @param shared: whether to acquire in shared mode; by default an
@@ -574,7 +542,7 @@ class SharedLock(object):
 
     return False
 
-  def acquire(self, shared=0, timeout=None):
+  def acquire(self, shared=0, timeout=None, test_notify=None):
     """Acquire a shared lock.
 
     @type shared: int
@@ -582,10 +550,16 @@ class SharedLock(object):
         exclusive lock will be acquired
     @type timeout: float
     @param timeout: maximum waiting time before giving up
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
     """
     self.__lock.acquire()
     try:
+      # We already got the lock, notify now
+      if __debug__ and callable(test_notify):
+        test_notify()
+
       return self.__acquire_unlocked(shared, timeout)
     finally:
       self.__lock.release()
@@ -641,7 +615,10 @@ class SharedLock(object):
       acquired = self.__is_exclusive()
 
       if not acquired:
-        acquired = self.__acquire_unlocked(timeout)
+        acquired = self.__acquire_unlocked(0, timeout)
+
+        assert self.__is_exclusive() and not self.__is_sharer(), \
+          "Lock wasn't acquired in exclusive mode"
 
       if acquired:
         self.__deleted = True
@@ -661,6 +638,12 @@ class SharedLock(object):
 ALL_SET = None
 
 
+class _AcquireTimeout(Exception):
+  """Internal exception to abort an acquire on a timeout.
+
+  """
+
+
 class LockSet:
   """Implements a set of locks.
 
@@ -731,6 +714,12 @@ class LockSet:
     else:
       return set()
 
+  def _release_and_delete_owned(self):
+    """Release and delete all resources owned by the current thread"""
+    for lname in self._list_owned():
+      self.__lockdict[lname].release()
+      self._del_owned(name=lname)
+
   def __names(self):
     """Return the current set of names.
 
@@ -759,31 +748,43 @@ class LockSet:
         self.__lock.release()
     return set(result)
 
-  def acquire(self, names, blocking=1, shared=0):
+  def acquire(self, names, timeout=None, shared=0, test_notify=None):
     """Acquire a set of resource locks.
 
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float or None
+    @param timeout: Maximum time to acquire all locks
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
-    @return: True when all the locks are successfully acquired
+    @return: Set of all locks successfully acquired or None in case of timeout
 
     @raise errors.LockError: when any lock we try to acquire has
         been deleted before we succeed. In this case none of the
         locks requested will be acquired.
 
     """
-    if not blocking:
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
+    assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
 
-    if names is None:
+    # We need to keep track of how long we spent waiting for a lock. The
+    # timeout passed to this function is over all lock acquires.
+    remaining_timeout = timeout
+    if timeout is None:
+      start = None
+      calc_remaining_timeout = lambda: None
+    else:
+      start = time.time()
+      calc_remaining_timeout = lambda: (start + timeout) - time.time()
+
+    want_all = names is None
+
+    if want_all:
       # If no names are given acquire the whole set by not letting new names
       # being added before we release, and getting the current list of names.
       # Some of them may then be deleted later, but we'll cope with this.
@@ -793,7 +794,7 @@ class LockSet:
       # them exclusively though they won't be able to do this anyway, though,
       # so we'll get the list lock exclusively as well in order to be able to
       # do add() on the set while owning it.
-      self.__lock.acquire(shared=shared)
+      self.__lock.acquire(shared=shared, timeout=remaining_timeout)
       try:
         # note we own the set-lock
         self._add_owned()
@@ -805,65 +806,103 @@ class LockSet:
         self.__lock.release()
         raise
 
+      # Re-calculate timeout
+      remaining_timeout = calc_remaining_timeout()
+
     try:
-      # Support passing in a single resource to acquire rather than many
-      if isinstance(names, basestring):
-        names = [names]
-      else:
-        names = sorted(names)
-
-      acquire_list = []
-      # First we look the locks up on __lockdict. We have no way of being sure
-      # they will still be there after, but this makes it a lot faster should
-      # just one of them be the already wrong
-      for lname in utils.UniqueSequence(names):
-        try:
-          lock = self.__lockdict[lname] # raises KeyError if lock is not there
-          acquire_list.append((lname, lock))
-        except (KeyError):
-          if self.__lock._is_owned():
-            # We are acquiring all the set, it doesn't matter if this
-            # particular element is not there anymore.
-            continue
-          else:
-            raise errors.LockError('non-existing lock in set (%s)' % lname)
-
-      # This will hold the locknames we effectively acquired.
-      acquired = set()
-      # Now acquire_list contains a sorted list of resources and locks we want.
-      # In order to get them we loop on this (private) list and acquire() them.
-      # We gave no real guarantee they will still exist till this is done but
-      # .acquire() itself is safe and will alert us if the lock gets deleted.
-      for (lname, lock) in acquire_list:
-        try:
-          lock.acquire(shared=shared) # raises LockError if the lock is deleted
-          # now the lock cannot be deleted, we have it!
-          self._add_owned(name=lname)
-          acquired.add(lname)
-        except (errors.LockError):
-          if self.__lock._is_owned():
-            # We are acquiring all the set, it doesn't matter if this
-            # particular element is not there anymore.
-            continue
+      try:
+        # Support passing in a single resource to acquire rather than many
+        if isinstance(names, basestring):
+          names = [names]
+        else:
+          names = sorted(names)
+
+        acquire_list = []
+        # First we look the locks up on __lockdict. We have no way of being sure
+        # they will still be there after, but this makes it a lot faster should
+        # just one of them be the already wrong
+        for lname in utils.UniqueSequence(names):
+          try:
+            lock = self.__lockdict[lname] # raises KeyError if lock is not there
+            acquire_list.append((lname, lock))
+          except KeyError:
+            if want_all:
+              # We are acquiring all the set, it doesn't matter if this
+              # particular element is not there anymore.
+              continue
+            else:
+              raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+        # This will hold the locknames we effectively acquired.
+        acquired = set()
+
+        # Now acquire_list contains a sorted list of resources and locks we
+        # want.  In order to get them we loop on this (private) list and
+        # acquire() them.  We gave no real guarantee they will still exist till
+        # this is done but .acquire() itself is safe and will alert us if the
+        # lock gets deleted.
+        for (lname, lock) in acquire_list:
+          if __debug__ and callable(test_notify):
+            test_notify_fn = lambda: test_notify(lname)
           else:
-            name_fail = lname
-            for lname in self._list_owned():
-              self.__lockdict[lname].release()
-              self._del_owned(name=lname)
-            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
-        except:
-          # We shouldn't have problems adding the lock to the owners list, but
-          # if we did we'll try to release this lock and re-raise exception.
-          # Of course something is going to be really wrong, after this.
-          if lock._is_owned():
-            lock.release()
-          raise
+            test_notify_fn = None
 
-    except:
-      # If something went wrong and we had the set-lock let's release it...
-      if self.__lock._is_owned():
-        self.__lock.release()
-      raise
+          try:
+            if timeout is not None and remaining_timeout < 0:
+              raise _AcquireTimeout()
+
+            # raises LockError if the lock was deleted
+            if not lock.acquire(shared=shared, timeout=remaining_timeout,
+                                test_notify=test_notify_fn):
+              # Couldn't get lock or timeout occurred
+              if timeout is None:
+                # This shouldn't happen as SharedLock.acquire(timeout=None) is
+                # blocking.
+                raise errors.LockError("Failed to get lock %s" % lname)
+
+              raise _AcquireTimeout()
+
+            # Re-calculate timeout
+            remaining_timeout = calc_remaining_timeout()
+
+            # now the lock cannot be deleted, we have it!
+            self._add_owned(name=lname)
+            acquired.add(lname)
+
+          except _AcquireTimeout:
+            # Release all acquired locks
+            self._release_and_delete_owned()
+            raise
+
+          except errors.LockError:
+            if want_all:
+              # We are acquiring all the set, it doesn't matter if this
+              # particular element is not there anymore.
+              continue
+
+            self._release_and_delete_owned()
+
+            raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+          except:
+            # We shouldn't have problems adding the lock to the owners list, but
+            # if we did we'll try to release this lock and re-raise exception.
+            # Of course something is going to be really wrong, after this.
+            if lock._is_owned():
+              lock.release()
+            raise
+
+      except:
+        # If something went wrong and we had the set-lock let's release it...
+        if want_all:
+          self.__lock.release()
+        raise
+
+    except _AcquireTimeout:
+      if want_all:
+        self._del_owned()
+
+      return None
 
     return acquired
 
@@ -961,26 +1000,19 @@ class LockSet:
 
     return True
 
-  def remove(self, names, blocking=1):
+  def remove(self, names):
     """Remove elements from the lock set.
 
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
     @param names: names of the resource to remove.
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported
-        yet unless you are already holding exclusively the locks)
 
-    @return:: a list of locks which we removed; the list is always
+    @return: a list of locks which we removed; the list is always
         equal to the names list if we were holding all the locks
         exclusively
 
     """
-    if not blocking and not self._is_owned():
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
-
     # Support passing in a single resource to remove rather than many
     if isinstance(names, basestring):
       names = [names]
@@ -1135,7 +1167,7 @@ class GanetiLockManager:
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
-  def acquire(self, level, names, blocking=1, shared=0):
+  def acquire(self, level, names, timeout=None, shared=0):
     """Acquire a set of resource locks, at the same level.
 
     @param level: the level at which the locks shall be acquired;
@@ -1144,8 +1176,8 @@ class GanetiLockManager:
         (special lock names, or instance/node names)
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float
+    @param timeout: Maximum time to acquire all locks
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -1164,8 +1196,7 @@ class GanetiLockManager:
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
-    return self.__keyring[level].acquire(names, shared=shared,
-                                         blocking=blocking)
+    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
@@ -1205,7 +1236,7 @@ class GanetiLockManager:
            " while owning some at a greater one")
     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
 
-  def remove(self, level, names, blocking=1):
+  def remove(self, level, names):
     """Remove locks from the specified level.
 
     You must either already own the locks you are trying to remove
@@ -1215,8 +1246,6 @@ class GanetiLockManager:
         it must be a member of LEVELS_MOD
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)
-    @param blocking: whether to block while trying to operate in
-        try-lock mode (this locking mode is not supported yet)
 
     """
     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
@@ -1228,4 +1257,4 @@ class GanetiLockManager:
     assert self._is_owned(level) or not self._upper_owned(level), (
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
-    return self.__keyring[level].remove(names, blocking=blocking)
+    return self.__keyring[level].remove(names)