Code and docstring style fixes
[ganeti-local] / lib / locking.py
index 901fdac..382a45c 100644 (file)
@@ -18,6 +18,9 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
+# Disable "Invalid name ..." message
+# pylint: disable-msg=C0103
+
 """Module implementing the Ganeti locking code."""
 
 import os
 """Module implementing the Ganeti locking code."""
 
 import os
@@ -49,21 +52,18 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
   return wrap
 
 
-class _SingleActionPipeConditionWaiter(object):
-  """Callable helper class for _SingleActionPipeCondition.
+class _SingleNotifyPipeConditionWaiter(object):
+  """Helper class for SingleNotifyPipeCondition
 
   """
   __slots__ = [
 
   """
   __slots__ = [
-    "_cond",
     "_fd",
     "_poller",
     ]
 
     "_fd",
     "_poller",
     ]
 
-  def __init__(self, cond, poller, fd):
-    """Initializes this class.
+  def __init__(self, poller, fd):
+    """Constructor for _SingleNotifyPipeConditionWaiter
 
 
-    @type cond: L{_SingleActionPipeCondition}
-    @param cond: Parent condition
     @type poller: select.poll
     @param poller: Poller object
     @type fd: int
     @type poller: select.poll
     @param poller: Poller object
     @type fd: int
@@ -71,8 +71,6 @@ class _SingleActionPipeConditionWaiter(object):
 
     """
     object.__init__(self)
 
     """
     object.__init__(self)
-
-    self._cond = cond
     self._poller = poller
     self._fd = fd
 
     self._poller = poller
     self._fd = fd
 
@@ -103,105 +101,96 @@ class _SingleActionPipeConditionWaiter(object):
         remaining_time = start_time + timeout - time.time()
 
 
         remaining_time = start_time + timeout - time.time()
 
 
-class _SingleActionPipeCondition(object):
-  """Wrapper around a pipe for usage inside conditions.
-
-  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
-  always allocated when constructing the class. Extra care is taken to always
-  close the file descriptors.
+class _BaseCondition(object):
+  """Base class containing common code for conditions.
 
 
-  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
-  notifications.
-
-  Warning: This class is designed to be used as the underlying component of a
-  locking condition, but is not by itself thread safe, and needs to be
-  protected by an external lock.
+  Some of this code is taken from python's threading module.
 
   """
   __slots__ = [
 
   """
   __slots__ = [
-    "_poller",
-    "_read_fd",
-    "_write_fd",
-    "_nwaiters",
+    "_lock",
+    "acquire",
+    "release",
     ]
 
     ]
 
-  _waiter_class = _SingleActionPipeConditionWaiter
+  def __init__(self, lock):
+    """Constructor for _BaseCondition.
 
 
-  def __init__(self):
-    """Initializes this class.
+    @type lock: threading.Lock
+    @param lock: condition base lock
 
     """
     object.__init__(self)
 
 
     """
     object.__init__(self)
 
-    self._nwaiters = 0
-
-    # Just assume the unpacking is successful, otherwise error handling gets
-    # very complicated.
-    (self._read_fd, self._write_fd) = os.pipe()
-    try:
-      # The poller looks for closure of the write side
-      poller = select.poll()
-      poller.register(self._read_fd, select.POLLHUP)
-
-      self._poller = poller
-    except:
-      if self._read_fd is not None:
-        os.close(self._read_fd)
-      if self._write_fd is not None:
-        os.close(self._write_fd)
-      raise
+    # Recursive locks are not supported
+    assert not hasattr(lock, "_acquire_restore")
+    assert not hasattr(lock, "_release_save")
 
 
-    # There should be no code here anymore, otherwise the pipe file descriptors
-    # may be not be cleaned up properly in case of errors.
+    self._lock = lock
 
 
-  def StartWaiting(self):
-    """Return function to wait for notification.
+    # Export the lock's acquire() and release() methods
+    self.acquire = lock.acquire
+    self.release = lock.release
 
 
-    @rtype: L{_SingleActionPipeConditionWaiter}
-    @return: Function to wait for notification
+  def _is_owned(self):
+    """Check whether lock is owned by current thread.
 
     """
 
     """
-    assert self._nwaiters >= 0
+    if self._lock.acquire(0):
+      self._lock.release()
+      return False
 
 
-    if self._poller is None:
-      raise RuntimeError("Already cleaned up")
+    return True
 
 
-    # Create waiter function and increase number of waiters
-    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
-    self._nwaiters += 1
-    return wait_fn
+  def _check_owned(self):
+    """Raise an exception if the current thread doesn't own the lock.
 
 
-  def DoneWaiting(self):
-    """Decrement number of waiters and automatic cleanup.
+    """
+    if not self._is_owned():
+      raise RuntimeError("cannot work with un-aquired lock")
 
 
-    Must be called after waiting for a notification.
 
 
-    @rtype: bool
-    @return: Whether this was the last waiter
+class SingleNotifyPipeCondition(_BaseCondition):
+  """Condition which can only be notified once.
 
 
-    """
-    assert self._nwaiters > 0
+  This condition class uses pipes and poll, internally, to be able to wait for
+  notification with a timeout, without resorting to polling. It is almost
+  compatible with Python's threading.Condition, with the following differences:
+    - notifyAll can only be called once, and no wait can happen after that
+    - notify is not supported, only notifyAll
 
 
-    self._nwaiters -= 1
+  """
 
 
-    if self._nwaiters == 0:
-      self._Cleanup()
-      return True
+  __slots__ = _BaseCondition.__slots__ + [
+    "_poller",
+    "_read_fd",
+    "_write_fd",
+    "_nwaiters",
+    "_notified",
+    ]
 
 
-    return False
+  _waiter_class = _SingleNotifyPipeConditionWaiter
 
 
-  def notifyAll(self):
-    """Close the writing side of the pipe to notify all waiters.
+  def __init__(self, lock):
+    """Constructor for SingleNotifyPipeCondition
 
     """
 
     """
-    if self._write_fd is None:
-      raise RuntimeError("Can only notify once")
-
-    os.close(self._write_fd)
+    _BaseCondition.__init__(self, lock)
+    self._nwaiters = 0
+    self._notified = False
+    self._read_fd = None
     self._write_fd = None
     self._write_fd = None
+    self._poller = None
+
+  def _check_unnotified(self):
+    """Throws an exception if already notified.
+
+    """
+    if self._notified:
+      raise RuntimeError("cannot use already notified condition")
 
   def _Cleanup(self):
 
   def _Cleanup(self):
-    """Close all file descriptors.
+    """Cleanup open file descriptors, if any.
 
     """
     if self._read_fd is not None:
 
     """
     if self._read_fd is not None:
@@ -211,19 +200,51 @@ class _SingleActionPipeCondition(object):
     if self._write_fd is not None:
       os.close(self._write_fd)
       self._write_fd = None
     if self._write_fd is not None:
       os.close(self._write_fd)
       self._write_fd = None
-
     self._poller = None
 
     self._poller = None
 
-  def __del__(self):
-    """Called on object deletion.
+  def wait(self, timeout=None):
+    """Wait for a notification.
+
+    @type timeout: float or None
+    @param timeout: Waiting timeout (can be None)
+
+    """
+    self._check_owned()
+    self._check_unnotified()
 
 
-    Ensure no file descriptors are left open.
+    self._nwaiters += 1
+    try:
+      if self._poller is None:
+        (self._read_fd, self._write_fd) = os.pipe()
+        self._poller = select.poll()
+        self._poller.register(self._read_fd, select.POLLHUP)
+
+      wait_fn = self._waiter_class(self._poller, self._read_fd)
+      self.release()
+      try:
+        # Wait for notification
+        wait_fn(timeout)
+      finally:
+        # Re-acquire lock
+        self.acquire()
+    finally:
+      self._nwaiters -= 1
+      if self._nwaiters == 0:
+        self._Cleanup()
+
+  def notifyAll(self):
+    """Close the writing side of the pipe to notify all waiters.
 
     """
 
     """
-    self._Cleanup()
+    self._check_owned()
+    self._check_unnotified()
+    self._notified = True
+    if self._write_fd is not None:
+      os.close(self._write_fd)
+      self._write_fd = None
 
 
 
 
-class _PipeCondition(object):
+class PipeCondition(_BaseCondition):
   """Group-only non-polling condition with counters.
 
   This condition class uses pipes and poll, internally, to be able to wait for
   """Group-only non-polling condition with counters.
 
   This condition class uses pipes and poll, internally, to be able to wait for
@@ -233,51 +254,20 @@ class _PipeCondition(object):
   there are any waiting threads.
 
   """
   there are any waiting threads.
 
   """
-  __slots__ = [
-    "_lock",
+  __slots__ = _BaseCondition.__slots__ + [
     "_nwaiters",
     "_nwaiters",
-    "_pipe",
-    "acquire",
-    "release",
+    "_single_condition",
     ]
 
     ]
 
-  _pipe_class = _SingleActionPipeCondition
+  _single_condition_class = SingleNotifyPipeCondition
 
   def __init__(self, lock):
     """Initializes this class.
 
     """
 
   def __init__(self, lock):
     """Initializes this class.
 
     """
-    object.__init__(self)
-
-    # Recursive locks are not supported
-    assert not hasattr(lock, "_acquire_restore")
-    assert not hasattr(lock, "_release_save")
-
-    self._lock = lock
-
-    # Export the lock's acquire() and release() methods
-    self.acquire = lock.acquire
-    self.release = lock.release
-
+    _BaseCondition.__init__(self, lock)
     self._nwaiters = 0
     self._nwaiters = 0
-    self._pipe = None
-
-  def _is_owned(self):
-    """Check whether lock is owned by current thread.
-
-    """
-    if self._lock.acquire(0):
-      self._lock.release()
-      return False
-
-    return True
-
-  def _check_owned(self):
-    """Raise an exception if the current thread doesn't own the lock.
-
-    """
-    if not self._is_owned():
-      raise RuntimeError("cannot work with un-aquired lock")
+    self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
     """Wait for a notification.
 
   def wait(self, timeout=None):
     """Wait for a notification.
@@ -288,32 +278,14 @@ class _PipeCondition(object):
     """
     self._check_owned()
 
     """
     self._check_owned()
 
-    if not self._pipe:
-      self._pipe = self._pipe_class()
-
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
-    pipe = self._pipe
+    my_condition = self._single_condition
 
     assert self._nwaiters >= 0
     self._nwaiters += 1
     try:
 
     assert self._nwaiters >= 0
     self._nwaiters += 1
     try:
-      # Get function to wait on the pipe
-      wait_fn = pipe.StartWaiting()
-      try:
-        # Release lock while waiting
-        self.release()
-        try:
-          # Wait for notification
-          wait_fn(timeout)
-        finally:
-          # Re-acquire lock
-          self.acquire()
-      finally:
-        # Destroy pipe if this was the last waiter and the current pipe is
-        # still the same. The same pipe cannot be reused after cleanup.
-        if pipe.DoneWaiting() and pipe == self._pipe:
-          self._pipe = None
+      my_condition.wait(timeout)
     finally:
       assert self._nwaiters > 0
       self._nwaiters -= 1
     finally:
       assert self._nwaiters > 0
       self._nwaiters -= 1
@@ -323,12 +295,8 @@ class _PipeCondition(object):
 
     """
     self._check_owned()
 
     """
     self._check_owned()
-
-    # Notify and forget pipe. A new one will be created on the next call to
-    # wait.
-    if self._pipe is not None:
-      self._pipe.notifyAll()
-      self._pipe = None
+    self._single_condition.notifyAll()
+    self._single_condition = self._single_condition_class(self._lock)
 
   def has_waiting(self):
     """Returns whether there are active waiters.
 
   def has_waiting(self):
     """Returns whether there are active waiters.
@@ -369,7 +337,7 @@ class _CountingCondition(object):
     """Waits for the condition to be notified.
 
     @type timeout: float or None
     """Waits for the condition to be notified.
 
     @type timeout: float or None
-    @param timeout: Timeout in seconds
+    @param timeout: Waiting timeout (can be None)
 
     """
     assert self._nwaiters >= 0
 
     """
     assert self._nwaiters >= 0
@@ -409,7 +377,7 @@ class SharedLock(object):
     "__shr",
     ]
 
     "__shr",
     ]
 
-  __condition_class = _CountingCondition
+  __condition_class = PipeCondition
 
   def __init__(self):
     """Construct a new SharedLock.
 
   def __init__(self):
     """Construct a new SharedLock.
@@ -520,7 +488,7 @@ class SharedLock(object):
     """
     return self.__pending[0] == cond
 
     """
     return self.__pending[0] == cond
 
-  def __acquire_unlocked(self, shared=0, timeout=None):
+  def __acquire_unlocked(self, shared, timeout):
     """Acquire a shared lock.
 
     @param shared: whether to acquire in shared mode; by default an
     """Acquire a shared lock.
 
     @param shared: whether to acquire in shared mode; by default an
@@ -574,7 +542,7 @@ class SharedLock(object):
 
     return False
 
 
     return False
 
-  def acquire(self, shared=0, timeout=None):
+  def acquire(self, shared=0, timeout=None, test_notify=None):
     """Acquire a shared lock.
 
     @type shared: int
     """Acquire a shared lock.
 
     @type shared: int
@@ -582,10 +550,16 @@ class SharedLock(object):
         exclusive lock will be acquired
     @type timeout: float
     @param timeout: maximum waiting time before giving up
         exclusive lock will be acquired
     @type timeout: float
     @param timeout: maximum waiting time before giving up
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
     """
     self.__lock.acquire()
     try:
 
     """
     self.__lock.acquire()
     try:
+      # We already got the lock, notify now
+      if __debug__ and callable(test_notify):
+        test_notify()
+
       return self.__acquire_unlocked(shared, timeout)
     finally:
       self.__lock.release()
       return self.__acquire_unlocked(shared, timeout)
     finally:
       self.__lock.release()
@@ -641,7 +615,10 @@ class SharedLock(object):
       acquired = self.__is_exclusive()
 
       if not acquired:
       acquired = self.__is_exclusive()
 
       if not acquired:
-        acquired = self.__acquire_unlocked(timeout)
+        acquired = self.__acquire_unlocked(0, timeout)
+
+        assert self.__is_exclusive() and not self.__is_sharer(), \
+          "Lock wasn't acquired in exclusive mode"
 
       if acquired:
         self.__deleted = True
 
       if acquired:
         self.__deleted = True
@@ -661,6 +638,12 @@ class SharedLock(object):
 ALL_SET = None
 
 
 ALL_SET = None
 
 
+class _AcquireTimeout(Exception):
+  """Internal exception to abort an acquire on a timeout.
+
+  """
+
+
 class LockSet:
   """Implements a set of locks.
 
 class LockSet:
   """Implements a set of locks.
 
@@ -731,6 +714,12 @@ class LockSet:
     else:
       return set()
 
     else:
       return set()
 
+  def _release_and_delete_owned(self):
+    """Release and delete all resources owned by the current thread"""
+    for lname in self._list_owned():
+      self.__lockdict[lname].release()
+      self._del_owned(name=lname)
+
   def __names(self):
     """Return the current set of names.
 
   def __names(self):
     """Return the current set of names.
 
@@ -759,31 +748,43 @@ class LockSet:
         self.__lock.release()
     return set(result)
 
         self.__lock.release()
     return set(result)
 
-  def acquire(self, names, blocking=1, shared=0):
+  def acquire(self, names, timeout=None, shared=0, test_notify=None):
     """Acquire a set of resource locks.
 
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     """Acquire a set of resource locks.
 
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float or None
+    @param timeout: Maximum time to acquire all locks
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
 
-    @return: True when all the locks are successfully acquired
+    @return: Set of all locks successfully acquired or None in case of timeout
 
     @raise errors.LockError: when any lock we try to acquire has
         been deleted before we succeed. In this case none of the
         locks requested will be acquired.
 
     """
 
     @raise errors.LockError: when any lock we try to acquire has
         been deleted before we succeed. In this case none of the
         locks requested will be acquired.
 
     """
-    if not blocking:
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
+    assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
 
 
     # Check we don't already own locks at this level
     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
 
-    if names is None:
+    # We need to keep track of how long we spent waiting for a lock. The
+    # timeout passed to this function is over all lock acquires.
+    remaining_timeout = timeout
+    if timeout is None:
+      start = None
+      calc_remaining_timeout = lambda: None
+    else:
+      start = time.time()
+      calc_remaining_timeout = lambda: (start + timeout) - time.time()
+
+    want_all = names is None
+
+    if want_all:
       # If no names are given acquire the whole set by not letting new names
       # being added before we release, and getting the current list of names.
       # Some of them may then be deleted later, but we'll cope with this.
       # If no names are given acquire the whole set by not letting new names
       # being added before we release, and getting the current list of names.
       # Some of them may then be deleted later, but we'll cope with this.
@@ -793,7 +794,7 @@ class LockSet:
       # them exclusively though they won't be able to do this anyway, though,
       # so we'll get the list lock exclusively as well in order to be able to
       # do add() on the set while owning it.
       # them exclusively though they won't be able to do this anyway, though,
       # so we'll get the list lock exclusively as well in order to be able to
       # do add() on the set while owning it.
-      self.__lock.acquire(shared=shared)
+      self.__lock.acquire(shared=shared, timeout=remaining_timeout)
       try:
         # note we own the set-lock
         self._add_owned()
       try:
         # note we own the set-lock
         self._add_owned()
@@ -805,65 +806,103 @@ class LockSet:
         self.__lock.release()
         raise
 
         self.__lock.release()
         raise
 
+      # Re-calculate timeout
+      remaining_timeout = calc_remaining_timeout()
+
     try:
     try:
-      # Support passing in a single resource to acquire rather than many
-      if isinstance(names, basestring):
-        names = [names]
-      else:
-        names = sorted(names)
-
-      acquire_list = []
-      # First we look the locks up on __lockdict. We have no way of being sure
-      # they will still be there after, but this makes it a lot faster should
-      # just one of them be the already wrong
-      for lname in utils.UniqueSequence(names):
-        try:
-          lock = self.__lockdict[lname] # raises KeyError if lock is not there
-          acquire_list.append((lname, lock))
-        except (KeyError):
-          if self.__lock._is_owned():
-            # We are acquiring all the set, it doesn't matter if this
-            # particular element is not there anymore.
-            continue
-          else:
-            raise errors.LockError('non-existing lock in set (%s)' % lname)
-
-      # This will hold the locknames we effectively acquired.
-      acquired = set()
-      # Now acquire_list contains a sorted list of resources and locks we want.
-      # In order to get them we loop on this (private) list and acquire() them.
-      # We gave no real guarantee they will still exist till this is done but
-      # .acquire() itself is safe and will alert us if the lock gets deleted.
-      for (lname, lock) in acquire_list:
-        try:
-          lock.acquire(shared=shared) # raises LockError if the lock is deleted
-          # now the lock cannot be deleted, we have it!
-          self._add_owned(name=lname)
-          acquired.add(lname)
-        except (errors.LockError):
-          if self.__lock._is_owned():
-            # We are acquiring all the set, it doesn't matter if this
-            # particular element is not there anymore.
-            continue
+      try:
+        # Support passing in a single resource to acquire rather than many
+        if isinstance(names, basestring):
+          names = [names]
+        else:
+          names = sorted(names)
+
+        acquire_list = []
+        # First we look the locks up on __lockdict. We have no way of being sure
+        # they will still be there after, but this makes it a lot faster should
+        # just one of them be the already wrong
+        for lname in utils.UniqueSequence(names):
+          try:
+            lock = self.__lockdict[lname] # raises KeyError if lock is not there
+            acquire_list.append((lname, lock))
+          except KeyError:
+            if want_all:
+              # We are acquiring all the set, it doesn't matter if this
+              # particular element is not there anymore.
+              continue
+            else:
+              raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+        # This will hold the locknames we effectively acquired.
+        acquired = set()
+
+        # Now acquire_list contains a sorted list of resources and locks we
+        # want.  In order to get them we loop on this (private) list and
+        # acquire() them.  We gave no real guarantee they will still exist till
+        # this is done but .acquire() itself is safe and will alert us if the
+        # lock gets deleted.
+        for (lname, lock) in acquire_list:
+          if __debug__ and callable(test_notify):
+            test_notify_fn = lambda: test_notify(lname)
           else:
           else:
-            name_fail = lname
-            for lname in self._list_owned():
-              self.__lockdict[lname].release()
-              self._del_owned(name=lname)
-            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
-        except:
-          # We shouldn't have problems adding the lock to the owners list, but
-          # if we did we'll try to release this lock and re-raise exception.
-          # Of course something is going to be really wrong, after this.
-          if lock._is_owned():
-            lock.release()
-          raise
+            test_notify_fn = None
 
 
-    except:
-      # If something went wrong and we had the set-lock let's release it...
-      if self.__lock._is_owned():
-        self.__lock.release()
-      raise
+          try:
+            if timeout is not None and remaining_timeout < 0:
+              raise _AcquireTimeout()
+
+            # raises LockError if the lock was deleted
+            if not lock.acquire(shared=shared, timeout=remaining_timeout,
+                                test_notify=test_notify_fn):
+              # Couldn't get lock or timeout occurred
+              if timeout is None:
+                # This shouldn't happen as SharedLock.acquire(timeout=None) is
+                # blocking.
+                raise errors.LockError("Failed to get lock %s" % lname)
+
+              raise _AcquireTimeout()
+
+            # Re-calculate timeout
+            remaining_timeout = calc_remaining_timeout()
+
+            # now the lock cannot be deleted, we have it!
+            self._add_owned(name=lname)
+            acquired.add(lname)
+
+          except _AcquireTimeout:
+            # Release all acquired locks
+            self._release_and_delete_owned()
+            raise
+
+          except errors.LockError:
+            if want_all:
+              # We are acquiring all the set, it doesn't matter if this
+              # particular element is not there anymore.
+              continue
+
+            self._release_and_delete_owned()
+
+            raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+          except:
+            # We shouldn't have problems adding the lock to the owners list, but
+            # if we did we'll try to release this lock and re-raise exception.
+            # Of course something is going to be really wrong, after this.
+            if lock._is_owned():
+              lock.release()
+            raise
+
+      except:
+        # If something went wrong and we had the set-lock let's release it...
+        if want_all:
+          self.__lock.release()
+        raise
+
+    except _AcquireTimeout:
+      if want_all:
+        self._del_owned()
+
+      return None
 
     return acquired
 
 
     return acquired
 
@@ -961,26 +1000,19 @@ class LockSet:
 
     return True
 
 
     return True
 
-  def remove(self, names, blocking=1):
+  def remove(self, names):
     """Remove elements from the lock set.
 
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
     @param names: names of the resource to remove.
     """Remove elements from the lock set.
 
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
     @param names: names of the resource to remove.
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported
-        yet unless you are already holding exclusively the locks)
 
 
-    @return:: a list of locks which we removed; the list is always
+    @return: a list of locks which we removed; the list is always
         equal to the names list if we were holding all the locks
         exclusively
 
     """
         equal to the names list if we were holding all the locks
         exclusively
 
     """
-    if not blocking and not self._is_owned():
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
-
     # Support passing in a single resource to remove rather than many
     if isinstance(names, basestring):
       names = [names]
     # Support passing in a single resource to remove rather than many
     if isinstance(names, basestring):
       names = [names]
@@ -1135,7 +1167,7 @@ class GanetiLockManager:
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
-  def acquire(self, level, names, blocking=1, shared=0):
+  def acquire(self, level, names, timeout=None, shared=0):
     """Acquire a set of resource locks, at the same level.
 
     @param level: the level at which the locks shall be acquired;
     """Acquire a set of resource locks, at the same level.
 
     @param level: the level at which the locks shall be acquired;
@@ -1144,8 +1176,8 @@ class GanetiLockManager:
         (special lock names, or instance/node names)
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
         (special lock names, or instance/node names)
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float
+    @param timeout: Maximum time to acquire all locks
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -1164,8 +1196,7 @@ class GanetiLockManager:
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
-    return self.__keyring[level].acquire(names, shared=shared,
-                                         blocking=blocking)
+    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
@@ -1205,7 +1236,7 @@ class GanetiLockManager:
            " while owning some at a greater one")
     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
 
            " while owning some at a greater one")
     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
 
-  def remove(self, level, names, blocking=1):
+  def remove(self, level, names):
     """Remove locks from the specified level.
 
     You must either already own the locks you are trying to remove
     """Remove locks from the specified level.
 
     You must either already own the locks you are trying to remove
@@ -1215,8 +1246,6 @@ class GanetiLockManager:
         it must be a member of LEVELS_MOD
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)
         it must be a member of LEVELS_MOD
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)
-    @param blocking: whether to block while trying to operate in
-        try-lock mode (this locking mode is not supported yet)
 
     """
     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
 
     """
     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
@@ -1228,4 +1257,4 @@ class GanetiLockManager:
     assert self._is_owned(level) or not self._upper_owned(level), (
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
     assert self._is_owned(level) or not self._upper_owned(level), (
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
-    return self.__keyring[level].remove(names, blocking=blocking)
+    return self.__keyring[level].remove(names)