hv_chroot: remove hard-coded path constructs
[ganeti-local] / lib / locking.py
index ea1f2cc..aeafd3f 100644 (file)
 
 """Module implementing the Ganeti locking code."""
 
 
 """Module implementing the Ganeti locking code."""
 
+# pylint: disable-msg=W0212
+
+# W0212 since e.g. LockSet methods use (a lot) the internals of
+# SharedLock
+
 import os
 import select
 import threading
 import os
 import select
 import threading
@@ -49,21 +54,71 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
   return wrap
 
 
-class _SingleActionPipeConditionWaiter(object):
-  """Callable helper class for _SingleActionPipeCondition.
+class RunningTimeout(object):
+  """Class to calculate remaining timeout when doing several operations.
+
+  """
+  __slots__ = [
+    "_allow_negative",
+    "_start_time",
+    "_time_fn",
+    "_timeout",
+    ]
+
+  def __init__(self, timeout, allow_negative, _time_fn=time.time):
+    """Initializes this class.
+
+    @type timeout: float
+    @param timeout: Timeout duration
+    @type allow_negative: bool
+    @param allow_negative: Whether to return values below zero
+    @param _time_fn: Time function for unittests
+
+    """
+    object.__init__(self)
+
+    if timeout is not None and timeout < 0.0:
+      raise ValueError("Timeout must not be negative")
+
+    self._timeout = timeout
+    self._allow_negative = allow_negative
+    self._time_fn = _time_fn
+
+    self._start_time = None
+
+  def Remaining(self):
+    """Returns the remaining timeout.
+
+    """
+    if self._timeout is None:
+      return None
+
+    # Get start time on first calculation
+    if self._start_time is None:
+      self._start_time = self._time_fn()
+
+    # Calculate remaining time
+    remaining_timeout = self._start_time + self._timeout - self._time_fn()
+
+    if not self._allow_negative:
+      # Ensure timeout is always >= 0
+      return max(0.0, remaining_timeout)
+
+    return remaining_timeout
+
+
+class _SingleNotifyPipeConditionWaiter(object):
+  """Helper class for SingleNotifyPipeCondition
 
   """
   __slots__ = [
 
   """
   __slots__ = [
-    "_cond",
     "_fd",
     "_poller",
     ]
 
     "_fd",
     "_poller",
     ]
 
-  def __init__(self, cond, poller, fd):
-    """Initializes this class.
+  def __init__(self, poller, fd):
+    """Constructor for _SingleNotifyPipeConditionWaiter
 
 
-    @type cond: L{_SingleActionPipeCondition}
-    @param cond: Parent condition
     @type poller: select.poll
     @param poller: Poller object
     @type fd: int
     @type poller: select.poll
     @param poller: Poller object
     @type fd: int
@@ -71,8 +126,6 @@ class _SingleActionPipeConditionWaiter(object):
 
     """
     object.__init__(self)
 
     """
     object.__init__(self)
-
-    self._cond = cond
     self._poller = poller
     self._fd = fd
 
     self._poller = poller
     self._fd = fd
 
@@ -83,10 +136,18 @@ class _SingleActionPipeConditionWaiter(object):
     @param timeout: Timeout for waiting (can be None)
 
     """
     @param timeout: Timeout for waiting (can be None)
 
     """
-    start_time = time.time()
-    remaining_time = timeout
+    running_timeout = RunningTimeout(timeout, True)
+
+    while True:
+      remaining_time = running_timeout.Remaining()
+
+      if remaining_time is not None:
+        if remaining_time < 0.0:
+          break
+
+        # Our calculation uses seconds, poll() wants milliseconds
+        remaining_time *= 1000
 
 
-    while timeout is None or remaining_time > 0:
       try:
         result = self._poller.poll(remaining_time)
       except EnvironmentError, err:
       try:
         result = self._poller.poll(remaining_time)
       except EnvironmentError, err:
@@ -98,110 +159,97 @@ class _SingleActionPipeConditionWaiter(object):
       if result and result[0][0] == self._fd:
         break
 
       if result and result[0][0] == self._fd:
         break
 
-      # Re-calculate timeout if necessary
-      if timeout is not None:
-        remaining_time = start_time + timeout - time.time()
-
 
 
-class _SingleActionPipeCondition(object):
-  """Wrapper around a pipe for usage inside conditions.
+class _BaseCondition(object):
+  """Base class containing common code for conditions.
 
 
-  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
-  always allocated when constructing the class. Extra care is taken to always
-  close the file descriptors.
-
-  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
-  notifications.
-
-  Warning: This class is designed to be used as the underlying component of a
-  locking condition, but is not by itself thread safe, and needs to be
-  protected by an external lock.
+  Some of this code is taken from python's threading module.
 
   """
   __slots__ = [
 
   """
   __slots__ = [
-    "_poller",
-    "_read_fd",
-    "_write_fd",
-    "_nwaiters",
+    "_lock",
+    "acquire",
+    "release",
     ]
 
     ]
 
-  _waiter_class = _SingleActionPipeConditionWaiter
+  def __init__(self, lock):
+    """Constructor for _BaseCondition.
 
 
-  def __init__(self):
-    """Initializes this class.
+    @type lock: threading.Lock
+    @param lock: condition base lock
 
     """
     object.__init__(self)
 
 
     """
     object.__init__(self)
 
-    self._nwaiters = 0
-
-    # Just assume the unpacking is successful, otherwise error handling gets
-    # very complicated.
-    (self._read_fd, self._write_fd) = os.pipe()
-    try:
-      # The poller looks for closure of the write side
-      poller = select.poll()
-      poller.register(self._read_fd, select.POLLHUP)
-
-      self._poller = poller
-    except:
-      if self._read_fd is not None:
-        os.close(self._read_fd)
-      if self._write_fd is not None:
-        os.close(self._write_fd)
-      raise
+    # Recursive locks are not supported
+    assert not hasattr(lock, "_acquire_restore")
+    assert not hasattr(lock, "_release_save")
 
 
-    # There should be no code here anymore, otherwise the pipe file descriptors
-    # may be not be cleaned up properly in case of errors.
+    self._lock = lock
 
 
-  def StartWaiting(self):
-    """Return function to wait for notification.
+    # Export the lock's acquire() and release() methods
+    self.acquire = lock.acquire
+    self.release = lock.release
 
 
-    @rtype: L{_SingleActionPipeConditionWaiter}
-    @return: Function to wait for notification
+  def _is_owned(self):
+    """Check whether lock is owned by current thread.
 
     """
 
     """
-    assert self._nwaiters >= 0
+    if self._lock.acquire(0):
+      self._lock.release()
+      return False
 
 
-    if self._poller is None:
-      raise RuntimeError("Already cleaned up")
+    return True
 
 
-    # Create waiter function and increase number of waiters
-    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
-    self._nwaiters += 1
-    return wait_fn
+  def _check_owned(self):
+    """Raise an exception if the current thread doesn't own the lock.
 
 
-  def DoneWaiting(self):
-    """Decrement number of waiters and automatic cleanup.
+    """
+    if not self._is_owned():
+      raise RuntimeError("cannot work with un-aquired lock")
 
 
-    Must be called after waiting for a notification.
 
 
-    @rtype: bool
-    @return: Whether this was the last waiter
+class SingleNotifyPipeCondition(_BaseCondition):
+  """Condition which can only be notified once.
 
 
-    """
-    assert self._nwaiters > 0
+  This condition class uses pipes and poll, internally, to be able to wait for
+  notification with a timeout, without resorting to polling. It is almost
+  compatible with Python's threading.Condition, with the following differences:
+    - notifyAll can only be called once, and no wait can happen after that
+    - notify is not supported, only notifyAll
 
 
-    self._nwaiters -= 1
+  """
 
 
-    if self._nwaiters == 0:
-      self._Cleanup()
-      return True
+  __slots__ = [
+    "_poller",
+    "_read_fd",
+    "_write_fd",
+    "_nwaiters",
+    "_notified",
+    ]
 
 
-    return False
+  _waiter_class = _SingleNotifyPipeConditionWaiter
 
 
-  def notifyAll(self):
-    """Close the writing side of the pipe to notify all waiters.
+  def __init__(self, lock):
+    """Constructor for SingleNotifyPipeCondition
 
     """
 
     """
-    if self._write_fd is None:
-      raise RuntimeError("Can only notify once")
-
-    os.close(self._write_fd)
+    _BaseCondition.__init__(self, lock)
+    self._nwaiters = 0
+    self._notified = False
+    self._read_fd = None
     self._write_fd = None
     self._write_fd = None
+    self._poller = None
+
+  def _check_unnotified(self):
+    """Throws an exception if already notified.
+
+    """
+    if self._notified:
+      raise RuntimeError("cannot use already notified condition")
 
   def _Cleanup(self):
 
   def _Cleanup(self):
-    """Close all file descriptors.
+    """Cleanup open file descriptors, if any.
 
     """
     if self._read_fd is not None:
 
     """
     if self._read_fd is not None:
@@ -211,19 +259,51 @@ class _SingleActionPipeCondition(object):
     if self._write_fd is not None:
       os.close(self._write_fd)
       self._write_fd = None
     if self._write_fd is not None:
       os.close(self._write_fd)
       self._write_fd = None
-
     self._poller = None
 
     self._poller = None
 
-  def __del__(self):
-    """Called on object deletion.
+  def wait(self, timeout=None):
+    """Wait for a notification.
+
+    @type timeout: float or None
+    @param timeout: Waiting timeout (can be None)
+
+    """
+    self._check_owned()
+    self._check_unnotified()
+
+    self._nwaiters += 1
+    try:
+      if self._poller is None:
+        (self._read_fd, self._write_fd) = os.pipe()
+        self._poller = select.poll()
+        self._poller.register(self._read_fd, select.POLLHUP)
 
 
-    Ensure no file descriptors are left open.
+      wait_fn = self._waiter_class(self._poller, self._read_fd)
+      self.release()
+      try:
+        # Wait for notification
+        wait_fn(timeout)
+      finally:
+        # Re-acquire lock
+        self.acquire()
+    finally:
+      self._nwaiters -= 1
+      if self._nwaiters == 0:
+        self._Cleanup()
+
+  def notifyAll(self): # pylint: disable-msg=C0103
+    """Close the writing side of the pipe to notify all waiters.
 
     """
 
     """
-    self._Cleanup()
+    self._check_owned()
+    self._check_unnotified()
+    self._notified = True
+    if self._write_fd is not None:
+      os.close(self._write_fd)
+      self._write_fd = None
 
 
 
 
-class _PipeCondition(object):
+class PipeCondition(_BaseCondition):
   """Group-only non-polling condition with counters.
 
   This condition class uses pipes and poll, internally, to be able to wait for
   """Group-only non-polling condition with counters.
 
   This condition class uses pipes and poll, internally, to be able to wait for
@@ -234,50 +314,19 @@ class _PipeCondition(object):
 
   """
   __slots__ = [
 
   """
   __slots__ = [
-    "_lock",
     "_nwaiters",
     "_nwaiters",
-    "_pipe",
-    "acquire",
-    "release",
+    "_single_condition",
     ]
 
     ]
 
-  _pipe_class = _SingleActionPipeCondition
+  _single_condition_class = SingleNotifyPipeCondition
 
   def __init__(self, lock):
     """Initializes this class.
 
     """
 
   def __init__(self, lock):
     """Initializes this class.
 
     """
-    object.__init__(self)
-
-    # Recursive locks are not supported
-    assert not hasattr(lock, "_acquire_restore")
-    assert not hasattr(lock, "_release_save")
-
-    self._lock = lock
-
-    # Export the lock's acquire() and release() methods
-    self.acquire = lock.acquire
-    self.release = lock.release
-
+    _BaseCondition.__init__(self, lock)
     self._nwaiters = 0
     self._nwaiters = 0
-    self._pipe = None
-
-  def _is_owned(self):
-    """Check whether lock is owned by current thread.
-
-    """
-    if self._lock.acquire(0):
-      self._lock.release()
-      return False
-
-    return True
-
-  def _check_owned(self):
-    """Raise an exception if the current thread doesn't own the lock.
-
-    """
-    if not self._is_owned():
-      raise RuntimeError("cannot work with un-aquired lock")
+    self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
     """Wait for a notification.
 
   def wait(self, timeout=None):
     """Wait for a notification.
@@ -288,47 +337,25 @@ class _PipeCondition(object):
     """
     self._check_owned()
 
     """
     self._check_owned()
 
-    if not self._pipe:
-      self._pipe = self._pipe_class()
-
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
-    pipe = self._pipe
+    my_condition = self._single_condition
 
     assert self._nwaiters >= 0
     self._nwaiters += 1
     try:
 
     assert self._nwaiters >= 0
     self._nwaiters += 1
     try:
-      # Get function to wait on the pipe
-      wait_fn = pipe.StartWaiting()
-      try:
-        # Release lock while waiting
-        self.release()
-        try:
-          # Wait for notification
-          wait_fn(timeout)
-        finally:
-          # Re-acquire lock
-          self.acquire()
-      finally:
-        # Destroy pipe if this was the last waiter and the current pipe is
-        # still the same. The same pipe cannot be reused after cleanup.
-        if pipe.DoneWaiting() and pipe == self._pipe:
-          self._pipe = None
+      my_condition.wait(timeout)
     finally:
       assert self._nwaiters > 0
       self._nwaiters -= 1
 
     finally:
       assert self._nwaiters > 0
       self._nwaiters -= 1
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
 
     """
     self._check_owned()
     """Notify all currently waiting threads.
 
     """
     self._check_owned()
-
-    # Notify and forget pipe. A new one will be created on the next call to
-    # wait.
-    if self._pipe is not None:
-      self._pipe.notifyAll()
-      self._pipe = None
+    self._single_condition.notifyAll()
+    self._single_condition = self._single_condition_class(self._lock)
 
   def has_waiting(self):
     """Returns whether there are active waiters.
 
   def has_waiting(self):
     """Returns whether there are active waiters.
@@ -359,7 +386,7 @@ class _CountingCondition(object):
     self._cond = threading.Condition(lock=lock)
     self._nwaiters = 0
 
     self._cond = threading.Condition(lock=lock)
     self._nwaiters = 0
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notifies the condition.
 
     """
     """Notifies the condition.
 
     """
@@ -369,7 +396,7 @@ class _CountingCondition(object):
     """Waits for the condition to be notified.
 
     @type timeout: float or None
     """Waits for the condition to be notified.
 
     @type timeout: float or None
-    @param timeout: Timeout in seconds
+    @param timeout: Waiting timeout (can be None)
 
     """
     assert self._nwaiters >= 0
 
     """
     assert self._nwaiters >= 0
@@ -409,7 +436,7 @@ class SharedLock(object):
     "__shr",
     ]
 
     "__shr",
     ]
 
-  __condition_class = _PipeCondition
+  __condition_class = PipeCondition
 
   def __init__(self):
     """Construct a new SharedLock.
 
   def __init__(self):
     """Construct a new SharedLock.
@@ -520,7 +547,7 @@ class SharedLock(object):
     """
     return self.__pending[0] == cond
 
     """
     return self.__pending[0] == cond
 
-  def __acquire_unlocked(self, shared=0, timeout=None):
+  def __acquire_unlocked(self, shared, timeout):
     """Acquire a shared lock.
 
     @param shared: whether to acquire in shared mode; by default an
     """Acquire a shared lock.
 
     @param shared: whether to acquire in shared mode; by default an
@@ -574,18 +601,24 @@ class SharedLock(object):
 
     return False
 
 
     return False
 
-  def acquire(self, shared=0, timeout=None):
+  def acquire(self, shared=0, timeout=None, test_notify=None):
     """Acquire a shared lock.
 
     """Acquire a shared lock.
 
-    @type shared: int
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float
     @param timeout: maximum waiting time before giving up
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float
     @param timeout: maximum waiting time before giving up
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
     """
     self.__lock.acquire()
     try:
 
     """
     self.__lock.acquire()
     try:
+      # We already got the lock, notify now
+      if __debug__ and callable(test_notify):
+        test_notify()
+
       return self.__acquire_unlocked(shared, timeout)
     finally:
       self.__lock.release()
       return self.__acquire_unlocked(shared, timeout)
     finally:
       self.__lock.release()
@@ -641,7 +674,10 @@ class SharedLock(object):
       acquired = self.__is_exclusive()
 
       if not acquired:
       acquired = self.__is_exclusive()
 
       if not acquired:
-        acquired = self.__acquire_unlocked(timeout)
+        acquired = self.__acquire_unlocked(0, timeout)
+
+        assert self.__is_exclusive() and not self.__is_sharer(), \
+          "Lock wasn't acquired in exclusive mode"
 
       if acquired:
         self.__deleted = True
 
       if acquired:
         self.__deleted = True
@@ -661,6 +697,12 @@ class SharedLock(object):
 ALL_SET = None
 
 
 ALL_SET = None
 
 
+class _AcquireTimeout(Exception):
+  """Internal exception to abort an acquire on a timeout.
+
+  """
+
+
 class LockSet:
   """Implements a set of locks.
 
 class LockSet:
   """Implements a set of locks.
 
@@ -675,6 +717,7 @@ class LockSet:
   def __init__(self, members=None):
     """Constructs a new LockSet.
 
   def __init__(self, members=None):
     """Constructs a new LockSet.
 
+    @type members: list of strings
     @param members: initial members of the set
 
     """
     @param members: initial members of the set
 
     """
@@ -716,6 +759,9 @@ class LockSet:
   def _del_owned(self, name=None):
     """Note the current thread owns the given lock"""
 
   def _del_owned(self, name=None):
     """Note the current thread owns the given lock"""
 
+    assert not (name is None and self.__lock._is_owned()), \
+           "Cannot hold internal lock when deleting owner status"
+
     if name is not None:
       self.__owners[threading.currentThread()].remove(name)
 
     if name is not None:
       self.__owners[threading.currentThread()].remove(name)
 
@@ -731,6 +777,14 @@ class LockSet:
     else:
       return set()
 
     else:
       return set()
 
+  def _release_and_delete_owned(self):
+    """Release and delete all resources owned by the current thread"""
+    for lname in self._list_owned():
+      lock = self.__lockdict[lname]
+      if lock._is_owned():
+        lock.release()
+      self._del_owned(name=lname)
+
   def __names(self):
     """Return the current set of names.
 
   def __names(self):
     """Return the current set of names.
 
@@ -759,110 +813,158 @@ class LockSet:
         self.__lock.release()
     return set(result)
 
         self.__lock.release()
     return set(result)
 
-  def acquire(self, names, blocking=1, shared=0):
+  def acquire(self, names, timeout=None, shared=0, test_notify=None):
     """Acquire a set of resource locks.
 
     """Acquire a set of resource locks.
 
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float or None
+    @param timeout: Maximum time to acquire all locks
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
 
-    @return: True when all the locks are successfully acquired
+    @return: Set of all locks successfully acquired or None in case of timeout
 
     @raise errors.LockError: when any lock we try to acquire has
         been deleted before we succeed. In this case none of the
         locks requested will be acquired.
 
     """
 
     @raise errors.LockError: when any lock we try to acquire has
         been deleted before we succeed. In this case none of the
         locks requested will be acquired.
 
     """
-    if not blocking:
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
+    assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
 
 
     # Check we don't already own locks at this level
     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
 
-    if names is None:
-      # If no names are given acquire the whole set by not letting new names
-      # being added before we release, and getting the current list of names.
-      # Some of them may then be deleted later, but we'll cope with this.
-      #
-      # We'd like to acquire this lock in a shared way, as it's nice if
-      # everybody else can use the instances at the same time. If are acquiring
-      # them exclusively though they won't be able to do this anyway, though,
-      # so we'll get the list lock exclusively as well in order to be able to
-      # do add() on the set while owning it.
-      self.__lock.acquire(shared=shared)
-      try:
-        # note we own the set-lock
-        self._add_owned()
-        names = self.__names()
-      except:
-        # We shouldn't have problems adding the lock to the owners list, but
-        # if we did we'll try to release this lock and re-raise exception.
-        # Of course something is going to be really wrong, after this.
-        self.__lock.release()
-        raise
+    # We need to keep track of how long we spent waiting for a lock. The
+    # timeout passed to this function is over all lock acquires.
+    running_timeout = RunningTimeout(timeout, False)
 
     try:
 
     try:
-      # Support passing in a single resource to acquire rather than many
-      if isinstance(names, basestring):
-        names = [names]
+      if names is not None:
+        # Support passing in a single resource to acquire rather than many
+        if isinstance(names, basestring):
+          names = [names]
+
+        return self.__acquire_inner(names, False, shared,
+                                    running_timeout.Remaining, test_notify)
+
       else:
       else:
-        names = sorted(names)
+        # If no names are given acquire the whole set by not letting new names
+        # being added before we release, and getting the current list of names.
+        # Some of them may then be deleted later, but we'll cope with this.
+        #
+        # We'd like to acquire this lock in a shared way, as it's nice if
+        # everybody else can use the instances at the same time. If are
+        # acquiring them exclusively though they won't be able to do this
+        # anyway, though, so we'll get the list lock exclusively as well in
+        # order to be able to do add() on the set while owning it.
+        if not self.__lock.acquire(shared=shared,
+                                   timeout=running_timeout.Remaining()):
+          raise _AcquireTimeout()
+        try:
+          # note we own the set-lock
+          self._add_owned()
+
+          return self.__acquire_inner(self.__names(), True, shared,
+                                      running_timeout.Remaining, test_notify)
+        except:
+          # We shouldn't have problems adding the lock to the owners list, but
+          # if we did we'll try to release this lock and re-raise exception.
+          # Of course something is going to be really wrong, after this.
+          self.__lock.release()
+          self._del_owned()
+          raise
+
+    except _AcquireTimeout:
+      return None
+
+  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
+    """Inner logic for acquiring a number of locks.
+
+    @param names: Names of the locks to be acquired
+    @param want_all: Whether all locks in the set should be acquired
+    @param shared: Whether to acquire in shared mode
+    @param timeout_fn: Function returning remaining timeout
+    @param test_notify: Special callback function for unittesting
+
+    """
+    acquire_list = []
+
+    # First we look the locks up on __lockdict. We have no way of being sure
+    # they will still be there after, but this makes it a lot faster should
+    # just one of them be the already wrong. Using a sorted sequence to prevent
+    # deadlocks.
+    for lname in sorted(utils.UniqueSequence(names)):
+      try:
+        lock = self.__lockdict[lname] # raises KeyError if lock is not there
+      except KeyError:
+        if want_all:
+          # We are acquiring all the set, it doesn't matter if this particular
+          # element is not there anymore.
+          continue
+
+        raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+      acquire_list.append((lname, lock))
+
+    # This will hold the locknames we effectively acquired.
+    acquired = set()
+
+    try:
+      # Now acquire_list contains a sorted list of resources and locks we
+      # want.  In order to get them we loop on this (private) list and
+      # acquire() them.  We gave no real guarantee they will still exist till
+      # this is done but .acquire() itself is safe and will alert us if the
+      # lock gets deleted.
+      for (lname, lock) in acquire_list:
+        if __debug__ and callable(test_notify):
+          test_notify_fn = lambda: test_notify(lname)
+        else:
+          test_notify_fn = None
+
+        timeout = timeout_fn()
 
 
-      acquire_list = []
-      # First we look the locks up on __lockdict. We have no way of being sure
-      # they will still be there after, but this makes it a lot faster should
-      # just one of them be the already wrong
-      for lname in utils.UniqueSequence(names):
         try:
         try:
-          lock = self.__lockdict[lname] # raises KeyError if lock is not there
-          acquire_list.append((lname, lock))
-        except (KeyError):
-          if self.__lock._is_owned():
+          # raises LockError if the lock was deleted
+          acq_success = lock.acquire(shared=shared, timeout=timeout,
+                                     test_notify=test_notify_fn)
+        except errors.LockError:
+          if want_all:
             # We are acquiring all the set, it doesn't matter if this
             # particular element is not there anymore.
             continue
             # We are acquiring all the set, it doesn't matter if this
             # particular element is not there anymore.
             continue
-          else:
-            raise errors.LockError('non-existing lock in set (%s)' % lname)
-
-      # This will hold the locknames we effectively acquired.
-      acquired = set()
-      # Now acquire_list contains a sorted list of resources and locks we want.
-      # In order to get them we loop on this (private) list and acquire() them.
-      # We gave no real guarantee they will still exist till this is done but
-      # .acquire() itself is safe and will alert us if the lock gets deleted.
-      for (lname, lock) in acquire_list:
+
+          raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+        if not acq_success:
+          # Couldn't get lock or timeout occurred
+          if timeout is None:
+            # This shouldn't happen as SharedLock.acquire(timeout=None) is
+            # blocking.
+            raise errors.LockError("Failed to get lock %s" % lname)
+
+          raise _AcquireTimeout()
+
         try:
         try:
-          lock.acquire(shared=shared) # raises LockError if the lock is deleted
           # now the lock cannot be deleted, we have it!
           self._add_owned(name=lname)
           acquired.add(lname)
           # now the lock cannot be deleted, we have it!
           self._add_owned(name=lname)
           acquired.add(lname)
-        except (errors.LockError):
-          if self.__lock._is_owned():
-            # We are acquiring all the set, it doesn't matter if this
-            # particular element is not there anymore.
-            continue
-          else:
-            name_fail = lname
-            for lname in self._list_owned():
-              self.__lockdict[lname].release()
-              self._del_owned(name=lname)
-            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
+
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
-          # Of course something is going to be really wrong, after this.
+          # Of course something is going to be really wrong after this.
           if lock._is_owned():
             lock.release()
           raise
 
     except:
           if lock._is_owned():
             lock.release()
           raise
 
     except:
-      # If something went wrong and we had the set-lock let's release it...
-      if self.__lock._is_owned():
-        self.__lock.release()
+      # Release all owned locks
+      self._release_and_delete_owned()
       raise
 
     return acquired
       raise
 
     return acquired
@@ -873,6 +975,7 @@ class LockSet:
     You must have acquired the locks, either in shared or in exclusive mode,
     before releasing them.
 
     You must have acquired the locks, either in shared or in exclusive mode,
     before releasing them.
 
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level).
 
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level).
 
@@ -906,8 +1009,11 @@ class LockSet:
   def add(self, names, acquired=0, shared=0):
     """Add a new set of elements to the set
 
   def add(self, names, acquired=0, shared=0):
     """Add a new set of elements to the set
 
+    @type names: list of strings
     @param names: names of the new elements to add
     @param names: names of the new elements to add
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: pre-acquire the new resource?
     @param acquired: pre-acquire the new resource?
+    @type shared: integer (0/1) used as a boolean
     @param shared: is the pre-acquisition shared?
 
     """
     @param shared: is the pre-acquisition shared?
 
     """
@@ -961,26 +1067,20 @@ class LockSet:
 
     return True
 
 
     return True
 
-  def remove(self, names, blocking=1):
+  def remove(self, names):
     """Remove elements from the lock set.
 
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
     """Remove elements from the lock set.
 
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
+    @type names: list of strings
     @param names: names of the resource to remove.
     @param names: names of the resource to remove.
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported
-        yet unless you are already holding exclusively the locks)
 
 
-    @return:: a list of locks which we removed; the list is always
+    @return: a list of locks which we removed; the list is always
         equal to the names list if we were holding all the locks
         exclusively
 
     """
         equal to the names list if we were holding all the locks
         exclusively
 
     """
-    if not blocking and not self._is_owned():
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
-
     # Support passing in a single resource to remove rather than many
     if isinstance(names, basestring):
       names = [names]
     # Support passing in a single resource to remove rather than many
     if isinstance(names, basestring):
       names = [names]
@@ -1118,7 +1218,7 @@ class GanetiLockManager:
     # the test cases.
     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
 
     # the test cases.
     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
 
-  def _BGL_owned(self):
+  def _BGL_owned(self): # pylint: disable-msg=C0103
     """Check if the current thread owns the BGL.
 
     Both an exclusive or a shared acquisition work.
     """Check if the current thread owns the BGL.
 
     Both an exclusive or a shared acquisition work.
@@ -1126,7 +1226,8 @@ class GanetiLockManager:
     """
     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
 
     """
     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
 
-  def _contains_BGL(self, level, names):
+  @staticmethod
+  def _contains_BGL(level, names): # pylint: disable-msg=C0103
     """Check if the level contains the BGL.
 
     Check if acting on the given level and set of names will change
     """Check if the level contains the BGL.
 
     Check if acting on the given level and set of names will change
@@ -1135,17 +1236,19 @@ class GanetiLockManager:
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
-  def acquire(self, level, names, blocking=1, shared=0):
+  def acquire(self, level, names, timeout=None, shared=0):
     """Acquire a set of resource locks, at the same level.
 
     """Acquire a set of resource locks, at the same level.
 
-    @param level: the level at which the locks shall be acquired;
-        it must be a member of LEVELS.
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be acquired
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float
+    @param timeout: Maximum time to acquire all locks
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -1164,8 +1267,7 @@ class GanetiLockManager:
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
-    return self.__keyring[level].acquire(names, shared=shared,
-                                         blocking=blocking)
+    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
@@ -1173,8 +1275,9 @@ class GanetiLockManager:
     You must have acquired the locks, either in shared or in exclusive
     mode, before releasing them.
 
     You must have acquired the locks, either in shared or in exclusive
     mode, before releasing them.
 
-    @param level: the level at which the locks shall be released;
-        it must be a member of LEVELS
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be released
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level)
 
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level)
 
@@ -1183,7 +1286,9 @@ class GanetiLockManager:
     assert (not self._contains_BGL(level, names) or
             not self._upper_owned(LEVEL_CLUSTER)), (
             "Cannot release the Big Ganeti Lock while holding something"
     assert (not self._contains_BGL(level, names) or
             not self._upper_owned(LEVEL_CLUSTER)), (
             "Cannot release the Big Ganeti Lock while holding something"
-            " at upper levels")
+            " at upper levels (%r)" %
+            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
+                              for i in self.__keyring.keys()]), ))
 
     # Release will complain if we don't own the locks already
     return self.__keyring[level].release(names)
 
     # Release will complain if we don't own the locks already
     return self.__keyring[level].release(names)
@@ -1191,10 +1296,13 @@ class GanetiLockManager:
   def add(self, level, names, acquired=0, shared=0):
     """Add locks at the specified level.
 
   def add(self, level, names, acquired=0, shared=0):
     """Add locks at the specified level.
 
-    @param level: the level at which the locks shall be added;
-        it must be a member of LEVELS_MOD.
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be added
+    @type names: list of strings
     @param names: names of the locks to acquire
     @param names: names of the locks to acquire
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: whether to acquire the newly added locks
     @param acquired: whether to acquire the newly added locks
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether the acquisition will be shared
 
     """
     @param shared: whether the acquisition will be shared
 
     """
@@ -1205,18 +1313,17 @@ class GanetiLockManager:
            " while owning some at a greater one")
     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
 
            " while owning some at a greater one")
     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
 
-  def remove(self, level, names, blocking=1):
+  def remove(self, level, names):
     """Remove locks from the specified level.
 
     You must either already own the locks you are trying to remove
     exclusively or not own any lock at an upper level.
 
     """Remove locks from the specified level.
 
     You must either already own the locks you are trying to remove
     exclusively or not own any lock at an upper level.
 
-    @param level: the level at which the locks shall be removed;
-        it must be a member of LEVELS_MOD
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be removed
+    @type names: list of strings
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)
-    @param blocking: whether to block while trying to operate in
-        try-lock mode (this locking mode is not supported yet)
 
     """
     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
 
     """
     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
@@ -1228,4 +1335,4 @@ class GanetiLockManager:
     assert self._is_owned(level) or not self._upper_owned(level), (
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
     assert self._is_owned(level) or not self._upper_owned(level), (
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
-    return self.__keyring[level].remove(names, blocking=blocking)
+    return self.__keyring[level].remove(names)