Fix burnin error when trying to grow a file volume
[ganeti-local] / lib / locking.py
index ea1f2cc..aeafd3f 100644 (file)
 
 """Module implementing the Ganeti locking code."""
 
+# pylint: disable-msg=W0212
+
+# W0212 since e.g. LockSet methods use (a lot) the internals of
+# SharedLock
+
 import os
 import select
 import threading
@@ -49,21 +54,71 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
-class _SingleActionPipeConditionWaiter(object):
-  """Callable helper class for _SingleActionPipeCondition.
+class RunningTimeout(object):
+  """Class to calculate remaining timeout when doing several operations.
+
+  """
+  __slots__ = [
+    "_allow_negative",
+    "_start_time",
+    "_time_fn",
+    "_timeout",
+    ]
+
+  def __init__(self, timeout, allow_negative, _time_fn=time.time):
+    """Initializes this class.
+
+    @type timeout: float
+    @param timeout: Timeout duration
+    @type allow_negative: bool
+    @param allow_negative: Whether to return values below zero
+    @param _time_fn: Time function for unittests
+
+    """
+    object.__init__(self)
+
+    if timeout is not None and timeout < 0.0:
+      raise ValueError("Timeout must not be negative")
+
+    self._timeout = timeout
+    self._allow_negative = allow_negative
+    self._time_fn = _time_fn
+
+    self._start_time = None
+
+  def Remaining(self):
+    """Returns the remaining timeout.
+
+    """
+    if self._timeout is None:
+      return None
+
+    # Get start time on first calculation
+    if self._start_time is None:
+      self._start_time = self._time_fn()
+
+    # Calculate remaining time
+    remaining_timeout = self._start_time + self._timeout - self._time_fn()
+
+    if not self._allow_negative:
+      # Ensure timeout is always >= 0
+      return max(0.0, remaining_timeout)
+
+    return remaining_timeout
+
+
+class _SingleNotifyPipeConditionWaiter(object):
+  """Helper class for SingleNotifyPipeCondition
 
   """
   __slots__ = [
-    "_cond",
     "_fd",
     "_poller",
     ]
 
-  def __init__(self, cond, poller, fd):
-    """Initializes this class.
+  def __init__(self, poller, fd):
+    """Constructor for _SingleNotifyPipeConditionWaiter
 
-    @type cond: L{_SingleActionPipeCondition}
-    @param cond: Parent condition
     @type poller: select.poll
     @param poller: Poller object
     @type fd: int
@@ -71,8 +126,6 @@ class _SingleActionPipeConditionWaiter(object):
 
     """
     object.__init__(self)
-
-    self._cond = cond
     self._poller = poller
     self._fd = fd
 
@@ -83,10 +136,18 @@ class _SingleActionPipeConditionWaiter(object):
     @param timeout: Timeout for waiting (can be None)
 
     """
-    start_time = time.time()
-    remaining_time = timeout
+    running_timeout = RunningTimeout(timeout, True)
+
+    while True:
+      remaining_time = running_timeout.Remaining()
+
+      if remaining_time is not None:
+        if remaining_time < 0.0:
+          break
+
+        # Our calculation uses seconds, poll() wants milliseconds
+        remaining_time *= 1000
 
-    while timeout is None or remaining_time > 0:
       try:
         result = self._poller.poll(remaining_time)
       except EnvironmentError, err:
@@ -98,110 +159,97 @@ class _SingleActionPipeConditionWaiter(object):
       if result and result[0][0] == self._fd:
         break
 
-      # Re-calculate timeout if necessary
-      if timeout is not None:
-        remaining_time = start_time + timeout - time.time()
-
 
-class _SingleActionPipeCondition(object):
-  """Wrapper around a pipe for usage inside conditions.
+class _BaseCondition(object):
+  """Base class containing common code for conditions.
 
-  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
-  always allocated when constructing the class. Extra care is taken to always
-  close the file descriptors.
-
-  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
-  notifications.
-
-  Warning: This class is designed to be used as the underlying component of a
-  locking condition, but is not by itself thread safe, and needs to be
-  protected by an external lock.
+  Some of this code is taken from python's threading module.
 
   """
   __slots__ = [
-    "_poller",
-    "_read_fd",
-    "_write_fd",
-    "_nwaiters",
+    "_lock",
+    "acquire",
+    "release",
     ]
 
-  _waiter_class = _SingleActionPipeConditionWaiter
+  def __init__(self, lock):
+    """Constructor for _BaseCondition.
 
-  def __init__(self):
-    """Initializes this class.
+    @type lock: threading.Lock
+    @param lock: condition base lock
 
     """
     object.__init__(self)
 
-    self._nwaiters = 0
-
-    # Just assume the unpacking is successful, otherwise error handling gets
-    # very complicated.
-    (self._read_fd, self._write_fd) = os.pipe()
-    try:
-      # The poller looks for closure of the write side
-      poller = select.poll()
-      poller.register(self._read_fd, select.POLLHUP)
-
-      self._poller = poller
-    except:
-      if self._read_fd is not None:
-        os.close(self._read_fd)
-      if self._write_fd is not None:
-        os.close(self._write_fd)
-      raise
+    # Recursive locks are not supported
+    assert not hasattr(lock, "_acquire_restore")
+    assert not hasattr(lock, "_release_save")
 
-    # There should be no code here anymore, otherwise the pipe file descriptors
-    # may be not be cleaned up properly in case of errors.
+    self._lock = lock
 
-  def StartWaiting(self):
-    """Return function to wait for notification.
+    # Export the lock's acquire() and release() methods
+    self.acquire = lock.acquire
+    self.release = lock.release
 
-    @rtype: L{_SingleActionPipeConditionWaiter}
-    @return: Function to wait for notification
+  def _is_owned(self):
+    """Check whether lock is owned by current thread.
 
     """
-    assert self._nwaiters >= 0
+    if self._lock.acquire(0):
+      self._lock.release()
+      return False
 
-    if self._poller is None:
-      raise RuntimeError("Already cleaned up")
+    return True
 
-    # Create waiter function and increase number of waiters
-    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
-    self._nwaiters += 1
-    return wait_fn
+  def _check_owned(self):
+    """Raise an exception if the current thread doesn't own the lock.
 
-  def DoneWaiting(self):
-    """Decrement number of waiters and automatic cleanup.
+    """
+    if not self._is_owned():
+      raise RuntimeError("cannot work with un-aquired lock")
 
-    Must be called after waiting for a notification.
 
-    @rtype: bool
-    @return: Whether this was the last waiter
+class SingleNotifyPipeCondition(_BaseCondition):
+  """Condition which can only be notified once.
 
-    """
-    assert self._nwaiters > 0
+  This condition class uses pipes and poll, internally, to be able to wait for
+  notification with a timeout, without resorting to polling. It is almost
+  compatible with Python's threading.Condition, with the following differences:
+    - notifyAll can only be called once, and no wait can happen after that
+    - notify is not supported, only notifyAll
 
-    self._nwaiters -= 1
+  """
 
-    if self._nwaiters == 0:
-      self._Cleanup()
-      return True
+  __slots__ = [
+    "_poller",
+    "_read_fd",
+    "_write_fd",
+    "_nwaiters",
+    "_notified",
+    ]
 
-    return False
+  _waiter_class = _SingleNotifyPipeConditionWaiter
 
-  def notifyAll(self):
-    """Close the writing side of the pipe to notify all waiters.
+  def __init__(self, lock):
+    """Constructor for SingleNotifyPipeCondition
 
     """
-    if self._write_fd is None:
-      raise RuntimeError("Can only notify once")
-
-    os.close(self._write_fd)
+    _BaseCondition.__init__(self, lock)
+    self._nwaiters = 0
+    self._notified = False
+    self._read_fd = None
     self._write_fd = None
+    self._poller = None
+
+  def _check_unnotified(self):
+    """Throws an exception if already notified.
+
+    """
+    if self._notified:
+      raise RuntimeError("cannot use already notified condition")
 
   def _Cleanup(self):
-    """Close all file descriptors.
+    """Cleanup open file descriptors, if any.
 
     """
     if self._read_fd is not None:
@@ -211,19 +259,51 @@ class _SingleActionPipeCondition(object):
     if self._write_fd is not None:
       os.close(self._write_fd)
       self._write_fd = None
-
     self._poller = None
 
-  def __del__(self):
-    """Called on object deletion.
+  def wait(self, timeout=None):
+    """Wait for a notification.
+
+    @type timeout: float or None
+    @param timeout: Waiting timeout (can be None)
+
+    """
+    self._check_owned()
+    self._check_unnotified()
+
+    self._nwaiters += 1
+    try:
+      if self._poller is None:
+        (self._read_fd, self._write_fd) = os.pipe()
+        self._poller = select.poll()
+        self._poller.register(self._read_fd, select.POLLHUP)
 
-    Ensure no file descriptors are left open.
+      wait_fn = self._waiter_class(self._poller, self._read_fd)
+      self.release()
+      try:
+        # Wait for notification
+        wait_fn(timeout)
+      finally:
+        # Re-acquire lock
+        self.acquire()
+    finally:
+      self._nwaiters -= 1
+      if self._nwaiters == 0:
+        self._Cleanup()
+
+  def notifyAll(self): # pylint: disable-msg=C0103
+    """Close the writing side of the pipe to notify all waiters.
 
     """
-    self._Cleanup()
+    self._check_owned()
+    self._check_unnotified()
+    self._notified = True
+    if self._write_fd is not None:
+      os.close(self._write_fd)
+      self._write_fd = None
 
 
-class _PipeCondition(object):
+class PipeCondition(_BaseCondition):
   """Group-only non-polling condition with counters.
 
   This condition class uses pipes and poll, internally, to be able to wait for
@@ -234,50 +314,19 @@ class _PipeCondition(object):
 
   """
   __slots__ = [
-    "_lock",
     "_nwaiters",
-    "_pipe",
-    "acquire",
-    "release",
+    "_single_condition",
     ]
 
-  _pipe_class = _SingleActionPipeCondition
+  _single_condition_class = SingleNotifyPipeCondition
 
   def __init__(self, lock):
     """Initializes this class.
 
     """
-    object.__init__(self)
-
-    # Recursive locks are not supported
-    assert not hasattr(lock, "_acquire_restore")
-    assert not hasattr(lock, "_release_save")
-
-    self._lock = lock
-
-    # Export the lock's acquire() and release() methods
-    self.acquire = lock.acquire
-    self.release = lock.release
-
+    _BaseCondition.__init__(self, lock)
     self._nwaiters = 0
-    self._pipe = None
-
-  def _is_owned(self):
-    """Check whether lock is owned by current thread.
-
-    """
-    if self._lock.acquire(0):
-      self._lock.release()
-      return False
-
-    return True
-
-  def _check_owned(self):
-    """Raise an exception if the current thread doesn't own the lock.
-
-    """
-    if not self._is_owned():
-      raise RuntimeError("cannot work with un-aquired lock")
+    self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
     """Wait for a notification.
@@ -288,47 +337,25 @@ class _PipeCondition(object):
     """
     self._check_owned()
 
-    if not self._pipe:
-      self._pipe = self._pipe_class()
-
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
-    pipe = self._pipe
+    my_condition = self._single_condition
 
     assert self._nwaiters >= 0
     self._nwaiters += 1
     try:
-      # Get function to wait on the pipe
-      wait_fn = pipe.StartWaiting()
-      try:
-        # Release lock while waiting
-        self.release()
-        try:
-          # Wait for notification
-          wait_fn(timeout)
-        finally:
-          # Re-acquire lock
-          self.acquire()
-      finally:
-        # Destroy pipe if this was the last waiter and the current pipe is
-        # still the same. The same pipe cannot be reused after cleanup.
-        if pipe.DoneWaiting() and pipe == self._pipe:
-          self._pipe = None
+      my_condition.wait(timeout)
     finally:
       assert self._nwaiters > 0
       self._nwaiters -= 1
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
 
     """
     self._check_owned()
-
-    # Notify and forget pipe. A new one will be created on the next call to
-    # wait.
-    if self._pipe is not None:
-      self._pipe.notifyAll()
-      self._pipe = None
+    self._single_condition.notifyAll()
+    self._single_condition = self._single_condition_class(self._lock)
 
   def has_waiting(self):
     """Returns whether there are active waiters.
@@ -359,7 +386,7 @@ class _CountingCondition(object):
     self._cond = threading.Condition(lock=lock)
     self._nwaiters = 0
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notifies the condition.
 
     """
@@ -369,7 +396,7 @@ class _CountingCondition(object):
     """Waits for the condition to be notified.
 
     @type timeout: float or None
-    @param timeout: Timeout in seconds
+    @param timeout: Waiting timeout (can be None)
 
     """
     assert self._nwaiters >= 0
@@ -409,7 +436,7 @@ class SharedLock(object):
     "__shr",
     ]
 
-  __condition_class = _PipeCondition
+  __condition_class = PipeCondition
 
   def __init__(self):
     """Construct a new SharedLock.
@@ -520,7 +547,7 @@ class SharedLock(object):
     """
     return self.__pending[0] == cond
 
-  def __acquire_unlocked(self, shared=0, timeout=None):
+  def __acquire_unlocked(self, shared, timeout):
     """Acquire a shared lock.
 
     @param shared: whether to acquire in shared mode; by default an
@@ -574,18 +601,24 @@ class SharedLock(object):
 
     return False
 
-  def acquire(self, shared=0, timeout=None):
+  def acquire(self, shared=0, timeout=None, test_notify=None):
     """Acquire a shared lock.
 
-    @type shared: int
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float
     @param timeout: maximum waiting time before giving up
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
     """
     self.__lock.acquire()
     try:
+      # We already got the lock, notify now
+      if __debug__ and callable(test_notify):
+        test_notify()
+
       return self.__acquire_unlocked(shared, timeout)
     finally:
       self.__lock.release()
@@ -641,7 +674,10 @@ class SharedLock(object):
       acquired = self.__is_exclusive()
 
       if not acquired:
-        acquired = self.__acquire_unlocked(timeout)
+        acquired = self.__acquire_unlocked(0, timeout)
+
+        assert self.__is_exclusive() and not self.__is_sharer(), \
+          "Lock wasn't acquired in exclusive mode"
 
       if acquired:
         self.__deleted = True
@@ -661,6 +697,12 @@ class SharedLock(object):
 ALL_SET = None
 
 
+class _AcquireTimeout(Exception):
+  """Internal exception to abort an acquire on a timeout.
+
+  """
+
+
 class LockSet:
   """Implements a set of locks.
 
@@ -675,6 +717,7 @@ class LockSet:
   def __init__(self, members=None):
     """Constructs a new LockSet.
 
+    @type members: list of strings
     @param members: initial members of the set
 
     """
@@ -716,6 +759,9 @@ class LockSet:
   def _del_owned(self, name=None):
     """Note the current thread owns the given lock"""
 
+    assert not (name is None and self.__lock._is_owned()), \
+           "Cannot hold internal lock when deleting owner status"
+
     if name is not None:
       self.__owners[threading.currentThread()].remove(name)
 
@@ -731,6 +777,14 @@ class LockSet:
     else:
       return set()
 
+  def _release_and_delete_owned(self):
+    """Release and delete all resources owned by the current thread"""
+    for lname in self._list_owned():
+      lock = self.__lockdict[lname]
+      if lock._is_owned():
+        lock.release()
+      self._del_owned(name=lname)
+
   def __names(self):
     """Return the current set of names.
 
@@ -759,110 +813,158 @@ class LockSet:
         self.__lock.release()
     return set(result)
 
-  def acquire(self, names, blocking=1, shared=0):
+  def acquire(self, names, timeout=None, shared=0, test_notify=None):
     """Acquire a set of resource locks.
 
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float or None
+    @param timeout: Maximum time to acquire all locks
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
-    @return: True when all the locks are successfully acquired
+    @return: Set of all locks successfully acquired or None in case of timeout
 
     @raise errors.LockError: when any lock we try to acquire has
         been deleted before we succeed. In this case none of the
         locks requested will be acquired.
 
     """
-    if not blocking:
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
+    assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
 
-    if names is None:
-      # If no names are given acquire the whole set by not letting new names
-      # being added before we release, and getting the current list of names.
-      # Some of them may then be deleted later, but we'll cope with this.
-      #
-      # We'd like to acquire this lock in a shared way, as it's nice if
-      # everybody else can use the instances at the same time. If are acquiring
-      # them exclusively though they won't be able to do this anyway, though,
-      # so we'll get the list lock exclusively as well in order to be able to
-      # do add() on the set while owning it.
-      self.__lock.acquire(shared=shared)
-      try:
-        # note we own the set-lock
-        self._add_owned()
-        names = self.__names()
-      except:
-        # We shouldn't have problems adding the lock to the owners list, but
-        # if we did we'll try to release this lock and re-raise exception.
-        # Of course something is going to be really wrong, after this.
-        self.__lock.release()
-        raise
+    # We need to keep track of how long we spent waiting for a lock. The
+    # timeout passed to this function is over all lock acquires.
+    running_timeout = RunningTimeout(timeout, False)
 
     try:
-      # Support passing in a single resource to acquire rather than many
-      if isinstance(names, basestring):
-        names = [names]
+      if names is not None:
+        # Support passing in a single resource to acquire rather than many
+        if isinstance(names, basestring):
+          names = [names]
+
+        return self.__acquire_inner(names, False, shared,
+                                    running_timeout.Remaining, test_notify)
+
       else:
-        names = sorted(names)
+        # If no names are given acquire the whole set by not letting new names
+        # being added before we release, and getting the current list of names.
+        # Some of them may then be deleted later, but we'll cope with this.
+        #
+        # We'd like to acquire this lock in a shared way, as it's nice if
+        # everybody else can use the instances at the same time. If are
+        # acquiring them exclusively though they won't be able to do this
+        # anyway, though, so we'll get the list lock exclusively as well in
+        # order to be able to do add() on the set while owning it.
+        if not self.__lock.acquire(shared=shared,
+                                   timeout=running_timeout.Remaining()):
+          raise _AcquireTimeout()
+        try:
+          # note we own the set-lock
+          self._add_owned()
+
+          return self.__acquire_inner(self.__names(), True, shared,
+                                      running_timeout.Remaining, test_notify)
+        except:
+          # We shouldn't have problems adding the lock to the owners list, but
+          # if we did we'll try to release this lock and re-raise exception.
+          # Of course something is going to be really wrong, after this.
+          self.__lock.release()
+          self._del_owned()
+          raise
+
+    except _AcquireTimeout:
+      return None
+
+  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
+    """Inner logic for acquiring a number of locks.
+
+    @param names: Names of the locks to be acquired
+    @param want_all: Whether all locks in the set should be acquired
+    @param shared: Whether to acquire in shared mode
+    @param timeout_fn: Function returning remaining timeout
+    @param test_notify: Special callback function for unittesting
+
+    """
+    acquire_list = []
+
+    # First we look the locks up on __lockdict. We have no way of being sure
+    # they will still be there after, but this makes it a lot faster should
+    # just one of them be the already wrong. Using a sorted sequence to prevent
+    # deadlocks.
+    for lname in sorted(utils.UniqueSequence(names)):
+      try:
+        lock = self.__lockdict[lname] # raises KeyError if lock is not there
+      except KeyError:
+        if want_all:
+          # We are acquiring all the set, it doesn't matter if this particular
+          # element is not there anymore.
+          continue
+
+        raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+      acquire_list.append((lname, lock))
+
+    # This will hold the locknames we effectively acquired.
+    acquired = set()
+
+    try:
+      # Now acquire_list contains a sorted list of resources and locks we
+      # want.  In order to get them we loop on this (private) list and
+      # acquire() them.  We gave no real guarantee they will still exist till
+      # this is done but .acquire() itself is safe and will alert us if the
+      # lock gets deleted.
+      for (lname, lock) in acquire_list:
+        if __debug__ and callable(test_notify):
+          test_notify_fn = lambda: test_notify(lname)
+        else:
+          test_notify_fn = None
+
+        timeout = timeout_fn()
 
-      acquire_list = []
-      # First we look the locks up on __lockdict. We have no way of being sure
-      # they will still be there after, but this makes it a lot faster should
-      # just one of them be the already wrong
-      for lname in utils.UniqueSequence(names):
         try:
-          lock = self.__lockdict[lname] # raises KeyError if lock is not there
-          acquire_list.append((lname, lock))
-        except (KeyError):
-          if self.__lock._is_owned():
+          # raises LockError if the lock was deleted
+          acq_success = lock.acquire(shared=shared, timeout=timeout,
+                                     test_notify=test_notify_fn)
+        except errors.LockError:
+          if want_all:
             # We are acquiring all the set, it doesn't matter if this
             # particular element is not there anymore.
             continue
-          else:
-            raise errors.LockError('non-existing lock in set (%s)' % lname)
-
-      # This will hold the locknames we effectively acquired.
-      acquired = set()
-      # Now acquire_list contains a sorted list of resources and locks we want.
-      # In order to get them we loop on this (private) list and acquire() them.
-      # We gave no real guarantee they will still exist till this is done but
-      # .acquire() itself is safe and will alert us if the lock gets deleted.
-      for (lname, lock) in acquire_list:
+
+          raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+        if not acq_success:
+          # Couldn't get lock or timeout occurred
+          if timeout is None:
+            # This shouldn't happen as SharedLock.acquire(timeout=None) is
+            # blocking.
+            raise errors.LockError("Failed to get lock %s" % lname)
+
+          raise _AcquireTimeout()
+
         try:
-          lock.acquire(shared=shared) # raises LockError if the lock is deleted
           # now the lock cannot be deleted, we have it!
           self._add_owned(name=lname)
           acquired.add(lname)
-        except (errors.LockError):
-          if self.__lock._is_owned():
-            # We are acquiring all the set, it doesn't matter if this
-            # particular element is not there anymore.
-            continue
-          else:
-            name_fail = lname
-            for lname in self._list_owned():
-              self.__lockdict[lname].release()
-              self._del_owned(name=lname)
-            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
+
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
-          # Of course something is going to be really wrong, after this.
+          # Of course something is going to be really wrong after this.
           if lock._is_owned():
             lock.release()
           raise
 
     except:
-      # If something went wrong and we had the set-lock let's release it...
-      if self.__lock._is_owned():
-        self.__lock.release()
+      # Release all owned locks
+      self._release_and_delete_owned()
       raise
 
     return acquired
@@ -873,6 +975,7 @@ class LockSet:
     You must have acquired the locks, either in shared or in exclusive mode,
     before releasing them.
 
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level).
 
@@ -906,8 +1009,11 @@ class LockSet:
   def add(self, names, acquired=0, shared=0):
     """Add a new set of elements to the set
 
+    @type names: list of strings
     @param names: names of the new elements to add
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: pre-acquire the new resource?
+    @type shared: integer (0/1) used as a boolean
     @param shared: is the pre-acquisition shared?
 
     """
@@ -961,26 +1067,20 @@ class LockSet:
 
     return True
 
-  def remove(self, names, blocking=1):
+  def remove(self, names):
     """Remove elements from the lock set.
 
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
+    @type names: list of strings
     @param names: names of the resource to remove.
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported
-        yet unless you are already holding exclusively the locks)
 
-    @return:: a list of locks which we removed; the list is always
+    @return: a list of locks which we removed; the list is always
         equal to the names list if we were holding all the locks
         exclusively
 
     """
-    if not blocking and not self._is_owned():
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
-
     # Support passing in a single resource to remove rather than many
     if isinstance(names, basestring):
       names = [names]
@@ -1118,7 +1218,7 @@ class GanetiLockManager:
     # the test cases.
     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
 
-  def _BGL_owned(self):
+  def _BGL_owned(self): # pylint: disable-msg=C0103
     """Check if the current thread owns the BGL.
 
     Both an exclusive or a shared acquisition work.
@@ -1126,7 +1226,8 @@ class GanetiLockManager:
     """
     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
 
-  def _contains_BGL(self, level, names):
+  @staticmethod
+  def _contains_BGL(level, names): # pylint: disable-msg=C0103
     """Check if the level contains the BGL.
 
     Check if acting on the given level and set of names will change
@@ -1135,17 +1236,19 @@ class GanetiLockManager:
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
-  def acquire(self, level, names, blocking=1, shared=0):
+  def acquire(self, level, names, timeout=None, shared=0):
     """Acquire a set of resource locks, at the same level.
 
-    @param level: the level at which the locks shall be acquired;
-        it must be a member of LEVELS.
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be acquired
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float
+    @param timeout: Maximum time to acquire all locks
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -1164,8 +1267,7 @@ class GanetiLockManager:
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
-    return self.__keyring[level].acquire(names, shared=shared,
-                                         blocking=blocking)
+    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
@@ -1173,8 +1275,9 @@ class GanetiLockManager:
     You must have acquired the locks, either in shared or in exclusive
     mode, before releasing them.
 
-    @param level: the level at which the locks shall be released;
-        it must be a member of LEVELS
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be released
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level)
 
@@ -1183,7 +1286,9 @@ class GanetiLockManager:
     assert (not self._contains_BGL(level, names) or
             not self._upper_owned(LEVEL_CLUSTER)), (
             "Cannot release the Big Ganeti Lock while holding something"
-            " at upper levels")
+            " at upper levels (%r)" %
+            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
+                              for i in self.__keyring.keys()]), ))
 
     # Release will complain if we don't own the locks already
     return self.__keyring[level].release(names)
@@ -1191,10 +1296,13 @@ class GanetiLockManager:
   def add(self, level, names, acquired=0, shared=0):
     """Add locks at the specified level.
 
-    @param level: the level at which the locks shall be added;
-        it must be a member of LEVELS_MOD.
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be added
+    @type names: list of strings
     @param names: names of the locks to acquire
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: whether to acquire the newly added locks
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether the acquisition will be shared
 
     """
@@ -1205,18 +1313,17 @@ class GanetiLockManager:
            " while owning some at a greater one")
     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
 
-  def remove(self, level, names, blocking=1):
+  def remove(self, level, names):
     """Remove locks from the specified level.
 
     You must either already own the locks you are trying to remove
     exclusively or not own any lock at an upper level.
 
-    @param level: the level at which the locks shall be removed;
-        it must be a member of LEVELS_MOD
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be removed
+    @type names: list of strings
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)
-    @param blocking: whether to block while trying to operate in
-        try-lock mode (this locking mode is not supported yet)
 
     """
     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
@@ -1228,4 +1335,4 @@ class GanetiLockManager:
     assert self._is_owned(level) or not self._upper_owned(level), (
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
-    return self.__keyring[level].remove(names, blocking=blocking)
+    return self.__keyring[level].remove(names)