Fix some epydoc warnings
[ganeti-local] / lib / locking.py
index 901fdac..057cc84 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 
 """Module implementing the Ganeti locking code."""
 
+# pylint: disable-msg=W0212
+
+# W0212 since e.g. LockSet methods use (a lot) the internals of
+# SharedLock
+
 import os
 import select
 import threading
 import time
 import errno
+import weakref
+import logging
 
 from ganeti import errors
 from ganeti import utils
+from ganeti import compat
+
 
+_EXCLUSIVE_TEXT = "exclusive"
+_SHARED_TEXT = "shared"
 
-def ssynchronized(lock, shared=0):
+
+def ssynchronized(mylock, shared=0):
   """Shared Synchronization decorator.
 
   Calls the function holding the given lock, either in exclusive or shared
   mode. It requires the passed lock to be a SharedLock (or support its
   semantics).
 
+  @type mylock: lockable object or string
+  @param mylock: lock to acquire or class member name of the lock to acquire
+
   """
   def wrap(fn):
     def sync_function(*args, **kwargs):
+      if isinstance(mylock, basestring):
+        assert args, "cannot ssynchronize on non-class method: self not found"
+        # args[0] is "self"
+        lock = getattr(args[0], mylock)
+      else:
+        lock = mylock
       lock.acquire(shared=shared)
       try:
         return fn(*args, **kwargs)
@@ -49,21 +70,71 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
-class _SingleActionPipeConditionWaiter(object):
-  """Callable helper class for _SingleActionPipeCondition.
+class RunningTimeout(object):
+  """Class to calculate remaining timeout when doing several operations.
+
+  """
+  __slots__ = [
+    "_allow_negative",
+    "_start_time",
+    "_time_fn",
+    "_timeout",
+    ]
+
+  def __init__(self, timeout, allow_negative, _time_fn=time.time):
+    """Initializes this class.
+
+    @type timeout: float
+    @param timeout: Timeout duration
+    @type allow_negative: bool
+    @param allow_negative: Whether to return values below zero
+    @param _time_fn: Time function for unittests
+
+    """
+    object.__init__(self)
+
+    if timeout is not None and timeout < 0.0:
+      raise ValueError("Timeout must not be negative")
+
+    self._timeout = timeout
+    self._allow_negative = allow_negative
+    self._time_fn = _time_fn
+
+    self._start_time = None
+
+  def Remaining(self):
+    """Returns the remaining timeout.
+
+    """
+    if self._timeout is None:
+      return None
+
+    # Get start time on first calculation
+    if self._start_time is None:
+      self._start_time = self._time_fn()
+
+    # Calculate remaining time
+    remaining_timeout = self._start_time + self._timeout - self._time_fn()
+
+    if not self._allow_negative:
+      # Ensure timeout is always >= 0
+      return max(0.0, remaining_timeout)
+
+    return remaining_timeout
+
+
+class _SingleNotifyPipeConditionWaiter(object):
+  """Helper class for SingleNotifyPipeCondition
 
   """
   __slots__ = [
-    "_cond",
     "_fd",
     "_poller",
     ]
 
-  def __init__(self, cond, poller, fd):
-    """Initializes this class.
+  def __init__(self, poller, fd):
+    """Constructor for _SingleNotifyPipeConditionWaiter
 
-    @type cond: L{_SingleActionPipeCondition}
-    @param cond: Parent condition
     @type poller: select.poll
     @param poller: Poller object
     @type fd: int
@@ -71,8 +142,6 @@ class _SingleActionPipeConditionWaiter(object):
 
     """
     object.__init__(self)
-
-    self._cond = cond
     self._poller = poller
     self._fd = fd
 
@@ -83,10 +152,18 @@ class _SingleActionPipeConditionWaiter(object):
     @param timeout: Timeout for waiting (can be None)
 
     """
-    start_time = time.time()
-    remaining_time = timeout
+    running_timeout = RunningTimeout(timeout, True)
+
+    while True:
+      remaining_time = running_timeout.Remaining()
+
+      if remaining_time is not None:
+        if remaining_time < 0.0:
+          break
+
+        # Our calculation uses seconds, poll() wants milliseconds
+        remaining_time *= 1000
 
-    while timeout is None or remaining_time > 0:
       try:
         result = self._poller.poll(remaining_time)
       except EnvironmentError, err:
@@ -98,110 +175,114 @@ class _SingleActionPipeConditionWaiter(object):
       if result and result[0][0] == self._fd:
         break
 
-      # Re-calculate timeout if necessary
-      if timeout is not None:
-        remaining_time = start_time + timeout - time.time()
-
-
-class _SingleActionPipeCondition(object):
-  """Wrapper around a pipe for usage inside conditions.
 
-  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
-  always allocated when constructing the class. Extra care is taken to always
-  close the file descriptors.
+class _BaseCondition(object):
+  """Base class containing common code for conditions.
 
-  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
-  notifications.
-
-  Warning: This class is designed to be used as the underlying component of a
-  locking condition, but is not by itself thread safe, and needs to be
-  protected by an external lock.
+  Some of this code is taken from python's threading module.
 
   """
   __slots__ = [
-    "_poller",
-    "_read_fd",
-    "_write_fd",
-    "_nwaiters",
+    "_lock",
+    "acquire",
+    "release",
+    "_is_owned",
+    "_acquire_restore",
+    "_release_save",
     ]
 
-  _waiter_class = _SingleActionPipeConditionWaiter
+  def __init__(self, lock):
+    """Constructor for _BaseCondition.
 
-  def __init__(self):
-    """Initializes this class.
+    @type lock: threading.Lock
+    @param lock: condition base lock
 
     """
     object.__init__(self)
 
-    self._nwaiters = 0
-
-    # Just assume the unpacking is successful, otherwise error handling gets
-    # very complicated.
-    (self._read_fd, self._write_fd) = os.pipe()
     try:
-      # The poller looks for closure of the write side
-      poller = select.poll()
-      poller.register(self._read_fd, select.POLLHUP)
-
-      self._poller = poller
-    except:
-      if self._read_fd is not None:
-        os.close(self._read_fd)
-      if self._write_fd is not None:
-        os.close(self._write_fd)
-      raise
+      self._release_save = lock._release_save
+    except AttributeError:
+      self._release_save = self._base_release_save
+    try:
+      self._acquire_restore = lock._acquire_restore
+    except AttributeError:
+      self._acquire_restore = self._base_acquire_restore
+    try:
+      self._is_owned = lock._is_owned
+    except AttributeError:
+      self._is_owned = self._base_is_owned
 
-    # There should be no code here anymore, otherwise the pipe file descriptors
-    # may be not be cleaned up properly in case of errors.
+    self._lock = lock
 
-  def StartWaiting(self):
-    """Return function to wait for notification.
+    # Export the lock's acquire() and release() methods
+    self.acquire = lock.acquire
+    self.release = lock.release
 
-    @rtype: L{_SingleActionPipeConditionWaiter}
-    @return: Function to wait for notification
+  def _base_is_owned(self):
+    """Check whether lock is owned by current thread.
 
     """
-    assert self._nwaiters >= 0
+    if self._lock.acquire(0):
+      self._lock.release()
+      return False
+    return True
 
-    if self._poller is None:
-      raise RuntimeError("Already cleaned up")
+  def _base_release_save(self):
+    self._lock.release()
 
-    # Create waiter function and increase number of waiters
-    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
-    self._nwaiters += 1
-    return wait_fn
+  def _base_acquire_restore(self, _):
+    self._lock.acquire()
 
-  def DoneWaiting(self):
-    """Decrement number of waiters and automatic cleanup.
+  def _check_owned(self):
+    """Raise an exception if the current thread doesn't own the lock.
 
-    Must be called after waiting for a notification.
+    """
+    if not self._is_owned():
+      raise RuntimeError("cannot work with un-aquired lock")
 
-    @rtype: bool
-    @return: Whether this was the last waiter
 
-    """
-    assert self._nwaiters > 0
+class SingleNotifyPipeCondition(_BaseCondition):
+  """Condition which can only be notified once.
 
-    self._nwaiters -= 1
+  This condition class uses pipes and poll, internally, to be able to wait for
+  notification with a timeout, without resorting to polling. It is almost
+  compatible with Python's threading.Condition, with the following differences:
+    - notifyAll can only be called once, and no wait can happen after that
+    - notify is not supported, only notifyAll
 
-    if self._nwaiters == 0:
-      self._Cleanup()
-      return True
+  """
 
-    return False
+  __slots__ = [
+    "_poller",
+    "_read_fd",
+    "_write_fd",
+    "_nwaiters",
+    "_notified",
+    ]
 
-  def notifyAll(self):
-    """Close the writing side of the pipe to notify all waiters.
+  _waiter_class = _SingleNotifyPipeConditionWaiter
 
-    """
-    if self._write_fd is None:
-      raise RuntimeError("Can only notify once")
+  def __init__(self, lock):
+    """Constructor for SingleNotifyPipeCondition
 
-    os.close(self._write_fd)
+    """
+    _BaseCondition.__init__(self, lock)
+    self._nwaiters = 0
+    self._notified = False
+    self._read_fd = None
     self._write_fd = None
+    self._poller = None
+
+  def _check_unnotified(self):
+    """Throws an exception if already notified.
+
+    """
+    if self._notified:
+      raise RuntimeError("cannot use already notified condition")
 
   def _Cleanup(self):
-    """Close all file descriptors.
+    """Cleanup open file descriptors, if any.
 
     """
     if self._read_fd is not None:
@@ -211,19 +292,51 @@ class _SingleActionPipeCondition(object):
     if self._write_fd is not None:
       os.close(self._write_fd)
       self._write_fd = None
-
     self._poller = None
 
-  def __del__(self):
-    """Called on object deletion.
+  def wait(self, timeout=None):
+    """Wait for a notification.
+
+    @type timeout: float or None
+    @param timeout: Waiting timeout (can be None)
+
+    """
+    self._check_owned()
+    self._check_unnotified()
+
+    self._nwaiters += 1
+    try:
+      if self._poller is None:
+        (self._read_fd, self._write_fd) = os.pipe()
+        self._poller = select.poll()
+        self._poller.register(self._read_fd, select.POLLHUP)
+
+      wait_fn = self._waiter_class(self._poller, self._read_fd)
+      state = self._release_save()
+      try:
+        # Wait for notification
+        wait_fn(timeout)
+      finally:
+        # Re-acquire lock
+        self._acquire_restore(state)
+    finally:
+      self._nwaiters -= 1
+      if self._nwaiters == 0:
+        self._Cleanup()
 
-    Ensure no file descriptors are left open.
+  def notifyAll(self): # pylint: disable-msg=C0103
+    """Close the writing side of the pipe to notify all waiters.
 
     """
-    self._Cleanup()
+    self._check_owned()
+    self._check_unnotified()
+    self._notified = True
+    if self._write_fd is not None:
+      os.close(self._write_fd)
+      self._write_fd = None
 
 
-class _PipeCondition(object):
+class PipeCondition(_BaseCondition):
   """Group-only non-polling condition with counters.
 
   This condition class uses pipes and poll, internally, to be able to wait for
@@ -234,50 +347,19 @@ class _PipeCondition(object):
 
   """
   __slots__ = [
-    "_lock",
-    "_nwaiters",
-    "_pipe",
-    "acquire",
-    "release",
+    "_waiters",
+    "_single_condition",
     ]
 
-  _pipe_class = _SingleActionPipeCondition
+  _single_condition_class = SingleNotifyPipeCondition
 
   def __init__(self, lock):
     """Initializes this class.
 
     """
-    object.__init__(self)
-
-    # Recursive locks are not supported
-    assert not hasattr(lock, "_acquire_restore")
-    assert not hasattr(lock, "_release_save")
-
-    self._lock = lock
-
-    # Export the lock's acquire() and release() methods
-    self.acquire = lock.acquire
-    self.release = lock.release
-
-    self._nwaiters = 0
-    self._pipe = None
-
-  def _is_owned(self):
-    """Check whether lock is owned by current thread.
-
-    """
-    if self._lock.acquire(0):
-      self._lock.release()
-      return False
-
-    return True
-
-  def _check_owned(self):
-    """Raise an exception if the current thread doesn't own the lock.
-
-    """
-    if not self._is_owned():
-      raise RuntimeError("cannot work with un-aquired lock")
+    _BaseCondition.__init__(self, lock)
+    self._waiters = set()
+    self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
     """Wait for a notification.
@@ -288,103 +370,40 @@ class _PipeCondition(object):
     """
     self._check_owned()
 
-    if not self._pipe:
-      self._pipe = self._pipe_class()
-
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
-    pipe = self._pipe
+    cond = self._single_condition
 
-    assert self._nwaiters >= 0
-    self._nwaiters += 1
+    self._waiters.add(threading.currentThread())
     try:
-      # Get function to wait on the pipe
-      wait_fn = pipe.StartWaiting()
-      try:
-        # Release lock while waiting
-        self.release()
-        try:
-          # Wait for notification
-          wait_fn(timeout)
-        finally:
-          # Re-acquire lock
-          self.acquire()
-      finally:
-        # Destroy pipe if this was the last waiter and the current pipe is
-        # still the same. The same pipe cannot be reused after cleanup.
-        if pipe.DoneWaiting() and pipe == self._pipe:
-          self._pipe = None
+      cond.wait(timeout)
     finally:
-      assert self._nwaiters > 0
-      self._nwaiters -= 1
+      self._check_owned()
+      self._waiters.remove(threading.currentThread())
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
 
     """
     self._check_owned()
+    self._single_condition.notifyAll()
+    self._single_condition = self._single_condition_class(self._lock)
 
-    # Notify and forget pipe. A new one will be created on the next call to
-    # wait.
-    if self._pipe is not None:
-      self._pipe.notifyAll()
-      self._pipe = None
-
-  def has_waiting(self):
-    """Returns whether there are active waiters.
+  def get_waiting(self):
+    """Returns a list of all waiting threads.
 
     """
     self._check_owned()
 
-    return bool(self._nwaiters)
-
-
-class _CountingCondition(object):
-  """Wrapper for Python's built-in threading.Condition class.
-
-  This wrapper keeps a count of active waiters. We can't access the internal
-  "__waiters" attribute of threading.Condition because it's not thread-safe.
-
-  """
-  __slots__ = [
-    "_cond",
-    "_nwaiters",
-    ]
-
-  def __init__(self, lock):
-    """Initializes this class.
-
-    """
-    object.__init__(self)
-    self._cond = threading.Condition(lock=lock)
-    self._nwaiters = 0
-
-  def notifyAll(self):
-    """Notifies the condition.
-
-    """
-    return self._cond.notifyAll()
-
-  def wait(self, timeout=None):
-    """Waits for the condition to be notified.
-
-    @type timeout: float or None
-    @param timeout: Timeout in seconds
-
-    """
-    assert self._nwaiters >= 0
-
-    self._nwaiters += 1
-    try:
-      return self._cond.wait(timeout=timeout)
-    finally:
-      self._nwaiters -= 1
+    return self._waiters
 
   def has_waiting(self):
     """Returns whether there are active waiters.
 
     """
-    return bool(self._nwaiters)
+    self._check_owned()
+
+    return bool(self._waiters)
 
 
 class SharedLock(object):
@@ -398,8 +417,12 @@ class SharedLock(object):
   the shared lock in the order they queued for it, just that they will
   eventually do so.
 
+  @type name: string
+  @ivar name: the name of the lock
+
   """
   __slots__ = [
+    "__weakref__",
     "__active_shr_c",
     "__inactive_shr_c",
     "__deleted",
@@ -407,16 +430,23 @@ class SharedLock(object):
     "__lock",
     "__pending",
     "__shr",
+    "name",
     ]
 
-  __condition_class = _CountingCondition
+  __condition_class = PipeCondition
 
-  def __init__(self):
+  def __init__(self, name, monitor=None):
     """Construct a new SharedLock.
 
+    @param name: the name of the lock
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register
+
     """
     object.__init__(self)
 
+    self.name = name
+
     # Internal lock
     self.__lock = threading.Lock()
 
@@ -434,12 +464,76 @@ class SharedLock(object):
     # is this lock in the deleted state?
     self.__deleted = False
 
+    # Register with lock monitor
+    if monitor:
+      monitor.RegisterLock(self)
+
+  def GetInfo(self, fields):
+    """Retrieves information for querying locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    self.__lock.acquire()
+    try:
+      info = []
+
+      # Note: to avoid unintentional race conditions, no references to
+      # modifiable objects should be returned unless they were created in this
+      # function.
+      for fname in fields:
+        if fname == "name":
+          info.append(self.name)
+        elif fname == "mode":
+          if self.__deleted:
+            info.append("deleted")
+            assert not (self.__exc or self.__shr)
+          elif self.__exc:
+            info.append(_EXCLUSIVE_TEXT)
+          elif self.__shr:
+            info.append(_SHARED_TEXT)
+          else:
+            info.append(None)
+        elif fname == "owner":
+          if self.__exc:
+            owner = [self.__exc]
+          else:
+            owner = self.__shr
+
+          if owner:
+            assert not self.__deleted
+            info.append([i.getName() for i in owner])
+          else:
+            info.append(None)
+        elif fname == "pending":
+          data = []
+
+          for cond in self.__pending:
+            if cond in (self.__active_shr_c, self.__inactive_shr_c):
+              mode = _SHARED_TEXT
+            else:
+              mode = _EXCLUSIVE_TEXT
+
+            # This function should be fast as it runs with the lock held. Hence
+            # not using utils.NiceSort.
+            data.append((mode, sorted([i.getName()
+                                       for i in cond.get_waiting()])))
+
+          info.append(data)
+        else:
+          raise errors.OpExecError("Invalid query field '%s'" % fname)
+
+      return info
+    finally:
+      self.__lock.release()
+
   def __check_deleted(self):
     """Raises an exception if the lock has been deleted.
 
     """
     if self.__deleted:
-      raise errors.LockError("Deleted lock")
+      raise errors.LockError("Deleted lock %s" % self.name)
 
   def __is_sharer(self):
     """Is the current thread sharing the lock at this time?
@@ -520,7 +614,7 @@ class SharedLock(object):
     """
     return self.__pending[0] == cond
 
-  def __acquire_unlocked(self, shared=0, timeout=None):
+  def __acquire_unlocked(self, shared, timeout):
     """Acquire a shared lock.
 
     @param shared: whether to acquire in shared mode; by default an
@@ -531,7 +625,8 @@ class SharedLock(object):
     self.__check_deleted()
 
     # We cannot acquire the lock if we already have it
-    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
+                                   " %s" % self.name)
 
     # Check whether someone else holds the lock or there are pending acquires.
     if not self.__pending and self.__can_acquire(shared):
@@ -574,18 +669,24 @@ class SharedLock(object):
 
     return False
 
-  def acquire(self, shared=0, timeout=None):
+  def acquire(self, shared=0, timeout=None, test_notify=None):
     """Acquire a shared lock.
 
-    @type shared: int
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float
     @param timeout: maximum waiting time before giving up
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
     """
     self.__lock.acquire()
     try:
+      # We already got the lock, notify now
+      if __debug__ and callable(test_notify):
+        test_notify()
+
       return self.__acquire_unlocked(shared, timeout)
     finally:
       self.__lock.release()
@@ -641,12 +742,17 @@ class SharedLock(object):
       acquired = self.__is_exclusive()
 
       if not acquired:
-        acquired = self.__acquire_unlocked(timeout)
+        acquired = self.__acquire_unlocked(0, timeout)
+
+        assert self.__is_exclusive() and not self.__is_sharer(), \
+          "Lock wasn't acquired in exclusive mode"
 
       if acquired:
         self.__deleted = True
         self.__exc = None
 
+        assert not (self.__exc or self.__shr), "Found owner during deletion"
+
         # Notify all acquires. They'll throw an error.
         while self.__pending:
           self.__pending.pop().notifyAll()
@@ -655,12 +761,26 @@ class SharedLock(object):
     finally:
       self.__lock.release()
 
+  def _release_save(self):
+    shared = self.__is_sharer()
+    self.release()
+    return shared
+
+  def _acquire_restore(self, shared):
+    self.acquire(shared=shared)
+
 
 # Whenever we want to acquire a full LockSet we pass None as the value
 # to acquire.  Hide this behind this nicely named constant.
 ALL_SET = None
 
 
+class _AcquireTimeout(Exception):
+  """Internal exception to abort an acquire on a timeout.
+
+  """
+
+
 class LockSet:
   """Implements a set of locks.
 
@@ -671,23 +791,35 @@ class LockSet:
 
   All the locks needed in the same set must be acquired together, though.
 
+  @type name: string
+  @ivar name: the name of the lockset
+
   """
-  def __init__(self, members=None):
+  def __init__(self, members, name, monitor=None):
     """Constructs a new LockSet.
 
+    @type members: list of strings
     @param members: initial members of the set
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register member locks
 
     """
+    assert members is not None, "members parameter is not a list"
+    self.name = name
+
+    # Lock monitor
+    self.__monitor = monitor
+
     # Used internally to guarantee coherency.
-    self.__lock = SharedLock()
+    self.__lock = SharedLock(name)
 
     # The lockdict indexes the relationship name -> lock
     # The order-of-locking is implied by the alphabetical order of names
     self.__lockdict = {}
 
-    if members is not None:
-      for name in members:
-        self.__lockdict[name] = SharedLock()
+    for mname in members:
+      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
+                                          monitor=monitor)
 
     # The owner dict contains the set of locks each thread owns. For
     # performance each thread can access its own key without a global lock on
@@ -698,6 +830,12 @@ class LockSet:
     # will be trouble.
     self.__owners = {}
 
+  def _GetLockName(self, mname):
+    """Returns the name for a member lock.
+
+    """
+    return "%s/%s" % (self.name, mname)
+
   def _is_owned(self):
     """Is the current thread a current level owner?"""
     return threading.currentThread() in self.__owners
@@ -716,6 +854,9 @@ class LockSet:
   def _del_owned(self, name=None):
     """Note the current thread owns the given lock"""
 
+    assert not (name is None and self.__lock._is_owned()), \
+           "Cannot hold internal lock when deleting owner status"
+
     if name is not None:
       self.__owners[threading.currentThread()].remove(name)
 
@@ -731,6 +872,14 @@ class LockSet:
     else:
       return set()
 
+  def _release_and_delete_owned(self):
+    """Release and delete all resources owned by the current thread"""
+    for lname in self._list_owned():
+      lock = self.__lockdict[lname]
+      if lock._is_owned():
+        lock.release()
+      self._del_owned(name=lname)
+
   def __names(self):
     """Return the current set of names.
 
@@ -759,110 +908,162 @@ class LockSet:
         self.__lock.release()
     return set(result)
 
-  def acquire(self, names, blocking=1, shared=0):
+  def acquire(self, names, timeout=None, shared=0, test_notify=None):
     """Acquire a set of resource locks.
 
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float or None
+    @param timeout: Maximum time to acquire all locks
+    @type test_notify: callable or None
+    @param test_notify: Special callback function for unittesting
 
-    @return: True when all the locks are successfully acquired
+    @return: Set of all locks successfully acquired or None in case of timeout
 
     @raise errors.LockError: when any lock we try to acquire has
         been deleted before we succeed. In this case none of the
         locks requested will be acquired.
 
     """
-    if not blocking:
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
+    assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
-    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
+    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
+                                  " (lockset %s)" % self.name)
 
-    if names is None:
-      # If no names are given acquire the whole set by not letting new names
-      # being added before we release, and getting the current list of names.
-      # Some of them may then be deleted later, but we'll cope with this.
-      #
-      # We'd like to acquire this lock in a shared way, as it's nice if
-      # everybody else can use the instances at the same time. If are acquiring
-      # them exclusively though they won't be able to do this anyway, though,
-      # so we'll get the list lock exclusively as well in order to be able to
-      # do add() on the set while owning it.
-      self.__lock.acquire(shared=shared)
-      try:
-        # note we own the set-lock
-        self._add_owned()
-        names = self.__names()
-      except:
-        # We shouldn't have problems adding the lock to the owners list, but
-        # if we did we'll try to release this lock and re-raise exception.
-        # Of course something is going to be really wrong, after this.
-        self.__lock.release()
-        raise
+    # We need to keep track of how long we spent waiting for a lock. The
+    # timeout passed to this function is over all lock acquires.
+    running_timeout = RunningTimeout(timeout, False)
 
     try:
-      # Support passing in a single resource to acquire rather than many
-      if isinstance(names, basestring):
-        names = [names]
+      if names is not None:
+        # Support passing in a single resource to acquire rather than many
+        if isinstance(names, basestring):
+          names = [names]
+
+        return self.__acquire_inner(names, False, shared,
+                                    running_timeout.Remaining, test_notify)
+
       else:
-        names = sorted(names)
+        # If no names are given acquire the whole set by not letting new names
+        # being added before we release, and getting the current list of names.
+        # Some of them may then be deleted later, but we'll cope with this.
+        #
+        # We'd like to acquire this lock in a shared way, as it's nice if
+        # everybody else can use the instances at the same time. If are
+        # acquiring them exclusively though they won't be able to do this
+        # anyway, though, so we'll get the list lock exclusively as well in
+        # order to be able to do add() on the set while owning it.
+        if not self.__lock.acquire(shared=shared,
+                                   timeout=running_timeout.Remaining()):
+          raise _AcquireTimeout()
+        try:
+          # note we own the set-lock
+          self._add_owned()
+
+          return self.__acquire_inner(self.__names(), True, shared,
+                                      running_timeout.Remaining, test_notify)
+        except:
+          # We shouldn't have problems adding the lock to the owners list, but
+          # if we did we'll try to release this lock and re-raise exception.
+          # Of course something is going to be really wrong, after this.
+          self.__lock.release()
+          self._del_owned()
+          raise
+
+    except _AcquireTimeout:
+      return None
+
+  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
+    """Inner logic for acquiring a number of locks.
+
+    @param names: Names of the locks to be acquired
+    @param want_all: Whether all locks in the set should be acquired
+    @param shared: Whether to acquire in shared mode
+    @param timeout_fn: Function returning remaining timeout
+    @param test_notify: Special callback function for unittesting
+
+    """
+    acquire_list = []
+
+    # First we look the locks up on __lockdict. We have no way of being sure
+    # they will still be there after, but this makes it a lot faster should
+    # just one of them be the already wrong. Using a sorted sequence to prevent
+    # deadlocks.
+    for lname in sorted(utils.UniqueSequence(names)):
+      try:
+        lock = self.__lockdict[lname] # raises KeyError if lock is not there
+      except KeyError:
+        if want_all:
+          # We are acquiring all the set, it doesn't matter if this particular
+          # element is not there anymore.
+          continue
+
+        raise errors.LockError("Non-existing lock %s in set %s" %
+                               (lname, self.name))
+
+      acquire_list.append((lname, lock))
+
+    # This will hold the locknames we effectively acquired.
+    acquired = set()
+
+    try:
+      # Now acquire_list contains a sorted list of resources and locks we
+      # want.  In order to get them we loop on this (private) list and
+      # acquire() them.  We gave no real guarantee they will still exist till
+      # this is done but .acquire() itself is safe and will alert us if the
+      # lock gets deleted.
+      for (lname, lock) in acquire_list:
+        if __debug__ and callable(test_notify):
+          test_notify_fn = lambda: test_notify(lname)
+        else:
+          test_notify_fn = None
+
+        timeout = timeout_fn()
 
-      acquire_list = []
-      # First we look the locks up on __lockdict. We have no way of being sure
-      # they will still be there after, but this makes it a lot faster should
-      # just one of them be the already wrong
-      for lname in utils.UniqueSequence(names):
         try:
-          lock = self.__lockdict[lname] # raises KeyError if lock is not there
-          acquire_list.append((lname, lock))
-        except (KeyError):
-          if self.__lock._is_owned():
+          # raises LockError if the lock was deleted
+          acq_success = lock.acquire(shared=shared, timeout=timeout,
+                                     test_notify=test_notify_fn)
+        except errors.LockError:
+          if want_all:
             # We are acquiring all the set, it doesn't matter if this
             # particular element is not there anymore.
             continue
-          else:
-            raise errors.LockError('non-existing lock in set (%s)' % lname)
-
-      # This will hold the locknames we effectively acquired.
-      acquired = set()
-      # Now acquire_list contains a sorted list of resources and locks we want.
-      # In order to get them we loop on this (private) list and acquire() them.
-      # We gave no real guarantee they will still exist till this is done but
-      # .acquire() itself is safe and will alert us if the lock gets deleted.
-      for (lname, lock) in acquire_list:
+
+          raise errors.LockError("Non-existing lock %s in set %s" %
+                                 (lname, self.name))
+
+        if not acq_success:
+          # Couldn't get lock or timeout occurred
+          if timeout is None:
+            # This shouldn't happen as SharedLock.acquire(timeout=None) is
+            # blocking.
+            raise errors.LockError("Failed to get lock %s (set %s)" %
+                                   (lname, self.name))
+
+          raise _AcquireTimeout()
+
         try:
-          lock.acquire(shared=shared) # raises LockError if the lock is deleted
           # now the lock cannot be deleted, we have it!
           self._add_owned(name=lname)
           acquired.add(lname)
-        except (errors.LockError):
-          if self.__lock._is_owned():
-            # We are acquiring all the set, it doesn't matter if this
-            # particular element is not there anymore.
-            continue
-          else:
-            name_fail = lname
-            for lname in self._list_owned():
-              self.__lockdict[lname].release()
-              self._del_owned(name=lname)
-            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
+
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
-          # Of course something is going to be really wrong, after this.
+          # Of course something is going to be really wrong after this.
           if lock._is_owned():
             lock.release()
           raise
 
     except:
-      # If something went wrong and we had the set-lock let's release it...
-      if self.__lock._is_owned():
-        self.__lock.release()
+      # Release all owned locks
+      self._release_and_delete_owned()
       raise
 
     return acquired
@@ -873,11 +1074,13 @@ class LockSet:
     You must have acquired the locks, either in shared or in exclusive mode,
     before releasing them.
 
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level).
 
     """
-    assert self._is_owned(), "release() on lock set while not owner"
+    assert self._is_owned(), ("release() on lock set %s while not owner" %
+                              self.name)
 
     # Support passing in a single resource to release rather than many
     if isinstance(names, basestring):
@@ -888,8 +1091,8 @@ class LockSet:
     else:
       names = set(names)
       assert self._list_owned().issuperset(names), (
-               "release() on unheld resources %s" %
-               names.difference(self._list_owned()))
+               "release() on unheld resources %s (set %s)" %
+               (names.difference(self._list_owned()), self.name))
 
     # First of all let's release the "all elements" lock, if set.
     # After this 'add' can work again
@@ -906,14 +1109,18 @@ class LockSet:
   def add(self, names, acquired=0, shared=0):
     """Add a new set of elements to the set
 
+    @type names: list of strings
     @param names: names of the new elements to add
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: pre-acquire the new resource?
+    @type shared: integer (0/1) used as a boolean
     @param shared: is the pre-acquisition shared?
 
     """
     # Check we don't already own locks at this level
     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
-      "Cannot add locks if the set is only partially owned, or shared"
+      ("Cannot add locks if the set %s is only partially owned, or shared" %
+       self.name)
 
     # Support passing in a single resource to add rather than many
     if isinstance(names, basestring):
@@ -932,10 +1139,11 @@ class LockSet:
         # This must be an explicit raise, not an assert, because assert is
         # turned off when using optimization, and this can happen because of
         # concurrency even if the user doesn't want it.
-        raise errors.LockError("duplicate add() (%s)" % invalid_names)
+        raise errors.LockError("duplicate add(%s) on lockset %s" %
+                               (invalid_names, self.name))
 
       for lockname in names:
-        lock = SharedLock()
+        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
 
         if acquired:
           lock.acquire(shared=shared)
@@ -961,26 +1169,20 @@ class LockSet:
 
     return True
 
-  def remove(self, names, blocking=1):
+  def remove(self, names):
     """Remove elements from the lock set.
 
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
+    @type names: list of strings
     @param names: names of the resource to remove.
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported
-        yet unless you are already holding exclusively the locks)
 
-    @return:: a list of locks which we removed; the list is always
+    @return: a list of locks which we removed; the list is always
         equal to the names list if we were holding all the locks
         exclusively
 
     """
-    if not blocking and not self._is_owned():
-      # We don't have non-blocking mode for now
-      raise NotImplementedError
-
     # Support passing in a single resource to remove rather than many
     if isinstance(names, basestring):
       names = [names]
@@ -989,7 +1191,8 @@ class LockSet:
     # to delete. The ownership must also be exclusive, but that will be checked
     # by the lock itself.
     assert not self._is_owned() or self._list_owned().issuperset(names), (
-      "remove() on acquired lockset while not owning all elements")
+      "remove() on acquired lockset %s while not owning all elements" %
+      self.name)
 
     removed = []
 
@@ -1004,7 +1207,8 @@ class LockSet:
         removed.append(lname)
       except (KeyError, errors.LockError):
         # This cannot happen if we were already holding it, verify:
-        assert not self._is_owned(), "remove failed while holding lockset"
+        assert not self._is_owned(), ("remove failed while holding lockset %s"
+                                      % self.name)
       else:
         # If no LockError was raised we are the ones who deleted the lock.
         # This means we can safely remove it from lockdict, as any further or
@@ -1077,13 +1281,24 @@ class GanetiLockManager:
 
     self.__class__._instance = self
 
+    self._monitor = LockMonitor()
+
     # The keyring contains all the locks, at their level and in the correct
     # locking order.
     self.__keyring = {
-      LEVEL_CLUSTER: LockSet([BGL]),
-      LEVEL_NODE: LockSet(nodes),
-      LEVEL_INSTANCE: LockSet(instances),
-    }
+      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
+      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+      LEVEL_INSTANCE: LockSet(instances, "instances",
+                              monitor=self._monitor),
+      }
+
+  def QueryLocks(self, fields, sync):
+    """Queries information from all locks.
+
+    See L{LockMonitor.QueryLocks}.
+
+    """
+    return self._monitor.QueryLocks(fields, sync)
 
   def _names(self, level):
     """List the lock names at the given level.
@@ -1116,9 +1331,9 @@ class GanetiLockManager:
     """
     # This way of checking only works if LEVELS[i] = i, which we check for in
     # the test cases.
-    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
+    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
 
-  def _BGL_owned(self):
+  def _BGL_owned(self): # pylint: disable-msg=C0103
     """Check if the current thread owns the BGL.
 
     Both an exclusive or a shared acquisition work.
@@ -1126,7 +1341,8 @@ class GanetiLockManager:
     """
     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
 
-  def _contains_BGL(self, level, names):
+  @staticmethod
+  def _contains_BGL(level, names): # pylint: disable-msg=C0103
     """Check if the level contains the BGL.
 
     Check if acting on the given level and set of names will change
@@ -1135,17 +1351,19 @@ class GanetiLockManager:
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
-  def acquire(self, level, names, blocking=1, shared=0):
+  def acquire(self, level, names, timeout=None, shared=0):
     """Acquire a set of resource locks, at the same level.
 
-    @param level: the level at which the locks shall be acquired;
-        it must be a member of LEVELS.
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be acquired
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
-    @param blocking: whether to block while trying to acquire or to
-        operate in try-lock mode (this locking mode is not supported yet)
+    @type timeout: float
+    @param timeout: Maximum time to acquire all locks
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -1164,8 +1382,7 @@ class GanetiLockManager:
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
-    return self.__keyring[level].acquire(names, shared=shared,
-                                         blocking=blocking)
+    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
@@ -1173,8 +1390,9 @@ class GanetiLockManager:
     You must have acquired the locks, either in shared or in exclusive
     mode, before releasing them.
 
-    @param level: the level at which the locks shall be released;
-        it must be a member of LEVELS
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be released
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level)
 
@@ -1183,7 +1401,9 @@ class GanetiLockManager:
     assert (not self._contains_BGL(level, names) or
             not self._upper_owned(LEVEL_CLUSTER)), (
             "Cannot release the Big Ganeti Lock while holding something"
-            " at upper levels")
+            " at upper levels (%r)" %
+            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
+                              for i in self.__keyring.keys()]), ))
 
     # Release will complain if we don't own the locks already
     return self.__keyring[level].release(names)
@@ -1191,10 +1411,13 @@ class GanetiLockManager:
   def add(self, level, names, acquired=0, shared=0):
     """Add locks at the specified level.
 
-    @param level: the level at which the locks shall be added;
-        it must be a member of LEVELS_MOD.
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be added
+    @type names: list of strings
     @param names: names of the locks to acquire
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: whether to acquire the newly added locks
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether the acquisition will be shared
 
     """
@@ -1205,18 +1428,17 @@ class GanetiLockManager:
            " while owning some at a greater one")
     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
 
-  def remove(self, level, names, blocking=1):
+  def remove(self, level, names):
     """Remove locks from the specified level.
 
     You must either already own the locks you are trying to remove
     exclusively or not own any lock at an upper level.
 
-    @param level: the level at which the locks shall be removed;
-        it must be a member of LEVELS_MOD
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be removed
+    @type names: list of strings
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)
-    @param blocking: whether to block while trying to operate in
-        try-lock mode (this locking mode is not supported yet)
 
     """
     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
@@ -1228,4 +1450,60 @@ class GanetiLockManager:
     assert self._is_owned(level) or not self._upper_owned(level), (
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
-    return self.__keyring[level].remove(names, blocking=blocking)
+    return self.__keyring[level].remove(names)
+
+
+class LockMonitor(object):
+  _LOCK_ATTR = "_lock"
+
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    self._lock = SharedLock("LockMonitor")
+
+    # Tracked locks. Weak references are used to avoid issues with circular
+    # references and deletion.
+    self._locks = weakref.WeakKeyDictionary()
+
+  @ssynchronized(_LOCK_ATTR)
+  def RegisterLock(self, lock):
+    """Registers a new lock.
+
+    """
+    logging.debug("Registering lock %s", lock.name)
+    assert lock not in self._locks, "Duplicate lock registration"
+    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
+           "Found duplicate lock name"
+    self._locks[lock] = None
+
+  @ssynchronized(_LOCK_ATTR)
+  def _GetLockInfo(self, fields):
+    """Get information from all locks while the monitor lock is held.
+
+    """
+    result = {}
+
+    for lock in self._locks.keys():
+      assert lock.name not in result, "Found duplicate lock name"
+      result[lock.name] = lock.GetInfo(fields)
+
+    return result
+
+  def QueryLocks(self, fields, sync):
+    """Queries information from all locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+    @type sync: boolean
+    @param sync: Whether to operate in synchronous mode
+
+    """
+    if sync:
+      raise NotImplementedError("Synchronous queries are not implemented")
+
+    # Get all data without sorting
+    result = self._GetLockInfo(fields)
+
+    # Sort by name
+    return [result[name] for name in utils.NiceSort(result.keys())]