jqueue/gnt-job: Add job priority fields for display
[ganeti-local] / lib / locking.py
index f13d7b2..d89388c 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
-# Disable "Invalid name ..." message
-# pylint: disable-msg=C0103
-
 """Module implementing the Ganeti locking code."""
 
+# pylint: disable-msg=W0212
+
+# W0212 since e.g. LockSet methods use (a lot) the internals of
+# SharedLock
+
 import os
 import select
 import threading
 import time
 import errno
+import weakref
+import logging
+import heapq
 
 from ganeti import errors
 from ganeti import utils
+from ganeti import compat
+
+
+_EXCLUSIVE_TEXT = "exclusive"
+_SHARED_TEXT = "shared"
+
+_DEFAULT_PRIORITY = 0
 
 
-def ssynchronized(lock, shared=0):
+def ssynchronized(mylock, shared=0):
   """Shared Synchronization decorator.
 
   Calls the function holding the given lock, either in exclusive or shared
   mode. It requires the passed lock to be a SharedLock (or support its
   semantics).
 
+  @type mylock: lockable object or string
+  @param mylock: lock to acquire or class member name of the lock to acquire
+
   """
   def wrap(fn):
     def sync_function(*args, **kwargs):
+      if isinstance(mylock, basestring):
+        assert args, "cannot ssynchronize on non-class method: self not found"
+        # args[0] is "self"
+        lock = getattr(args[0], mylock)
+      else:
+        lock = mylock
       lock.acquire(shared=shared)
       try:
         return fn(*args, **kwargs)
@@ -52,6 +73,59 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
+class RunningTimeout(object):
+  """Class to calculate remaining timeout when doing several operations.
+
+  """
+  __slots__ = [
+    "_allow_negative",
+    "_start_time",
+    "_time_fn",
+    "_timeout",
+    ]
+
+  def __init__(self, timeout, allow_negative, _time_fn=time.time):
+    """Initializes this class.
+
+    @type timeout: float
+    @param timeout: Timeout duration
+    @type allow_negative: bool
+    @param allow_negative: Whether to return values below zero
+    @param _time_fn: Time function for unittests
+
+    """
+    object.__init__(self)
+
+    if timeout is not None and timeout < 0.0:
+      raise ValueError("Timeout must not be negative")
+
+    self._timeout = timeout
+    self._allow_negative = allow_negative
+    self._time_fn = _time_fn
+
+    self._start_time = None
+
+  def Remaining(self):
+    """Returns the remaining timeout.
+
+    """
+    if self._timeout is None:
+      return None
+
+    # Get start time on first calculation
+    if self._start_time is None:
+      self._start_time = self._time_fn()
+
+    # Calculate remaining time
+    remaining_timeout = self._start_time + self._timeout - self._time_fn()
+
+    if not self._allow_negative:
+      # Ensure timeout is always >= 0
+      return max(0.0, remaining_timeout)
+
+    return remaining_timeout
+
+
 class _SingleNotifyPipeConditionWaiter(object):
   """Helper class for SingleNotifyPipeCondition
 
@@ -81,10 +155,18 @@ class _SingleNotifyPipeConditionWaiter(object):
     @param timeout: Timeout for waiting (can be None)
 
     """
-    start_time = time.time()
-    remaining_time = timeout
+    running_timeout = RunningTimeout(timeout, True)
+
+    while True:
+      remaining_time = running_timeout.Remaining()
+
+      if remaining_time is not None:
+        if remaining_time < 0.0:
+          break
+
+        # Our calculation uses seconds, poll() wants milliseconds
+        remaining_time *= 1000
 
-    while timeout is None or remaining_time > 0:
       try:
         result = self._poller.poll(remaining_time)
       except EnvironmentError, err:
@@ -96,10 +178,6 @@ class _SingleNotifyPipeConditionWaiter(object):
       if result and result[0][0] == self._fd:
         break
 
-      # Re-calculate timeout if necessary
-      if timeout is not None:
-        remaining_time = start_time + timeout - time.time()
-
 
 class _BaseCondition(object):
   """Base class containing common code for conditions.
@@ -111,6 +189,9 @@ class _BaseCondition(object):
     "_lock",
     "acquire",
     "release",
+    "_is_owned",
+    "_acquire_restore",
+    "_release_save",
     ]
 
   def __init__(self, lock):
@@ -122,9 +203,18 @@ class _BaseCondition(object):
     """
     object.__init__(self)
 
-    # Recursive locks are not supported
-    assert not hasattr(lock, "_acquire_restore")
-    assert not hasattr(lock, "_release_save")
+    try:
+      self._release_save = lock._release_save
+    except AttributeError:
+      self._release_save = self._base_release_save
+    try:
+      self._acquire_restore = lock._acquire_restore
+    except AttributeError:
+      self._acquire_restore = self._base_acquire_restore
+    try:
+      self._is_owned = lock._is_owned
+    except AttributeError:
+      self._is_owned = self._base_is_owned
 
     self._lock = lock
 
@@ -132,16 +222,21 @@ class _BaseCondition(object):
     self.acquire = lock.acquire
     self.release = lock.release
 
-  def _is_owned(self):
+  def _base_is_owned(self):
     """Check whether lock is owned by current thread.
 
     """
     if self._lock.acquire(0):
       self._lock.release()
       return False
-
     return True
 
+  def _base_release_save(self):
+    self._lock.release()
+
+  def _base_acquire_restore(self, _):
+    self._lock.acquire()
+
   def _check_owned(self):
     """Raise an exception if the current thread doesn't own the lock.
 
@@ -161,7 +256,7 @@ class SingleNotifyPipeCondition(_BaseCondition):
 
   """
 
-  __slots__ = _BaseCondition.__slots__ + [
+  __slots__ = [
     "_poller",
     "_read_fd",
     "_write_fd",
@@ -220,19 +315,19 @@ class SingleNotifyPipeCondition(_BaseCondition):
         self._poller.register(self._read_fd, select.POLLHUP)
 
       wait_fn = self._waiter_class(self._poller, self._read_fd)
-      self.release()
+      state = self._release_save()
       try:
         # Wait for notification
         wait_fn(timeout)
       finally:
         # Re-acquire lock
-        self.acquire()
+        self._acquire_restore(state)
     finally:
       self._nwaiters -= 1
       if self._nwaiters == 0:
         self._Cleanup()
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Close the writing side of the pipe to notify all waiters.
 
     """
@@ -254,8 +349,8 @@ class PipeCondition(_BaseCondition):
   there are any waiting threads.
 
   """
-  __slots__ = _BaseCondition.__slots__ + [
-    "_nwaiters",
+  __slots__ = [
+    "_waiters",
     "_single_condition",
     ]
 
@@ -266,7 +361,7 @@ class PipeCondition(_BaseCondition):
 
     """
     _BaseCondition.__init__(self, lock)
-    self._nwaiters = 0
+    self._waiters = set()
     self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
@@ -280,17 +375,16 @@ class PipeCondition(_BaseCondition):
 
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
-    my_condition = self._single_condition
+    cond = self._single_condition
 
-    assert self._nwaiters >= 0
-    self._nwaiters += 1
+    self._waiters.add(threading.currentThread())
     try:
-      my_condition.wait(timeout)
+      cond.wait(timeout)
     finally:
-      assert self._nwaiters > 0
-      self._nwaiters -= 1
+      self._check_owned()
+      self._waiters.remove(threading.currentThread())
 
-  def notifyAll(self):
+  def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
 
     """
@@ -298,102 +392,88 @@ class PipeCondition(_BaseCondition):
     self._single_condition.notifyAll()
     self._single_condition = self._single_condition_class(self._lock)
 
-  def has_waiting(self):
-    """Returns whether there are active waiters.
+  def get_waiting(self):
+    """Returns a list of all waiting threads.
 
     """
     self._check_owned()
 
-    return bool(self._nwaiters)
+    return self._waiters
 
+  def has_waiting(self):
+    """Returns whether there are active waiters.
 
-class _CountingCondition(object):
-  """Wrapper for Python's built-in threading.Condition class.
+    """
+    self._check_owned()
 
-  This wrapper keeps a count of active waiters. We can't access the internal
-  "__waiters" attribute of threading.Condition because it's not thread-safe.
+    return bool(self._waiters)
 
-  """
+
+class _PipeConditionWithMode(PipeCondition):
   __slots__ = [
-    "_cond",
-    "_nwaiters",
+    "shared",
     ]
 
-  def __init__(self, lock):
+  def __init__(self, lock, shared):
     """Initializes this class.
 
     """
-    object.__init__(self)
-    self._cond = threading.Condition(lock=lock)
-    self._nwaiters = 0
-
-  def notifyAll(self):
-    """Notifies the condition.
-
-    """
-    return self._cond.notifyAll()
-
-  def wait(self, timeout=None):
-    """Waits for the condition to be notified.
-
-    @type timeout: float or None
-    @param timeout: Waiting timeout (can be None)
-
-    """
-    assert self._nwaiters >= 0
-
-    self._nwaiters += 1
-    try:
-      return self._cond.wait(timeout=timeout)
-    finally:
-      self._nwaiters -= 1
-
-  def has_waiting(self):
-    """Returns whether there are active waiters.
-
-    """
-    return bool(self._nwaiters)
+    self.shared = shared
+    PipeCondition.__init__(self, lock)
 
 
 class SharedLock(object):
   """Implements a shared lock.
 
-  Multiple threads can acquire the lock in a shared way, calling
-  acquire_shared().  In order to acquire the lock in an exclusive way threads
-  can call acquire_exclusive().
+  Multiple threads can acquire the lock in a shared way by calling
+  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
+  threads can call C{acquire(shared=0)}.
+
+  Notes on data structures: C{__pending} contains a priority queue (heapq) of
+  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
+  ...]}. Each per-priority queue contains a normal in-order list of conditions
+  to be notified when the lock can be acquired. Shared locks are grouped
+  together by priority and the condition for them is stored in
+  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
+  references for the per-priority queues indexed by priority for faster access.
 
-  The lock prevents starvation but does not guarantee that threads will acquire
-  the shared lock in the order they queued for it, just that they will
-  eventually do so.
+  @type name: string
+  @ivar name: the name of the lock
 
   """
   __slots__ = [
-    "__active_shr_c",
-    "__inactive_shr_c",
+    "__weakref__",
     "__deleted",
     "__exc",
     "__lock",
     "__pending",
+    "__pending_by_prio",
+    "__pending_shared",
     "__shr",
+    "name",
     ]
 
-  __condition_class = PipeCondition
+  __condition_class = _PipeConditionWithMode
 
-  def __init__(self):
+  def __init__(self, name, monitor=None):
     """Construct a new SharedLock.
 
+    @param name: the name of the lock
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register
+
     """
     object.__init__(self)
 
+    self.name = name
+
     # Internal lock
     self.__lock = threading.Lock()
 
     # Queue containing waiting acquires
     self.__pending = []
-
-    # Active and inactive conditions for shared locks
-    self.__active_shr_c = self.__condition_class(self.__lock)
-    self.__inactive_shr_c = self.__condition_class(self.__lock)
+    self.__pending_by_prio = {}
+    self.__pending_shared = {}
 
     # Current lock holders
     self.__shr = set()
@@ -402,12 +482,78 @@ class SharedLock(object):
     # is this lock in the deleted state?
     self.__deleted = False
 
+    # Register with lock monitor
+    if monitor:
+      monitor.RegisterLock(self)
+
+  def GetInfo(self, fields):
+    """Retrieves information for querying locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    self.__lock.acquire()
+    try:
+      info = []
+
+      # Note: to avoid unintentional race conditions, no references to
+      # modifiable objects should be returned unless they were created in this
+      # function.
+      for fname in fields:
+        if fname == "name":
+          info.append(self.name)
+        elif fname == "mode":
+          if self.__deleted:
+            info.append("deleted")
+            assert not (self.__exc or self.__shr)
+          elif self.__exc:
+            info.append(_EXCLUSIVE_TEXT)
+          elif self.__shr:
+            info.append(_SHARED_TEXT)
+          else:
+            info.append(None)
+        elif fname == "owner":
+          if self.__exc:
+            owner = [self.__exc]
+          else:
+            owner = self.__shr
+
+          if owner:
+            assert not self.__deleted
+            info.append([i.getName() for i in owner])
+          else:
+            info.append(None)
+        elif fname == "pending":
+          data = []
+
+          # Sorting instead of copying and using heaq functions for simplicity
+          for (_, prioqueue) in sorted(self.__pending):
+            for cond in prioqueue:
+              if cond.shared:
+                mode = _SHARED_TEXT
+              else:
+                mode = _EXCLUSIVE_TEXT
+
+              # This function should be fast as it runs with the lock held.
+              # Hence not using utils.NiceSort.
+              data.append((mode, sorted(i.getName()
+                                        for i in cond.get_waiting())))
+
+          info.append(data)
+        else:
+          raise errors.OpExecError("Invalid query field '%s'" % fname)
+
+      return info
+    finally:
+      self.__lock.release()
+
   def __check_deleted(self):
     """Raises an exception if the lock has been deleted.
 
     """
     if self.__deleted:
-      raise errors.LockError("Deleted lock")
+      raise errors.LockError("Deleted lock %s" % self.name)
 
   def __is_sharer(self):
     """Is the current thread sharing the lock at this time?
@@ -458,7 +604,23 @@ class SharedLock(object):
     """
     self.__lock.acquire()
     try:
-      return len(self.__pending)
+      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
+    finally:
+      self.__lock.release()
+
+  def _check_empty(self):
+    """Checks whether there are any pending acquires.
+
+    @rtype: bool
+
+    """
+    self.__lock.acquire()
+    try:
+      # Order is important: __find_first_pending_queue modifies __pending
+      return not (self.__find_first_pending_queue() or
+                  self.__pending or
+                  self.__pending_by_prio or
+                  self.__pending_shared)
     finally:
       self.__lock.release()
 
@@ -480,26 +642,52 @@ class SharedLock(object):
     else:
       return len(self.__shr) == 0 and self.__exc is None
 
+  def __find_first_pending_queue(self):
+    """Tries to find the topmost queued entry with pending acquires.
+
+    Removes empty entries while going through the list.
+
+    """
+    while self.__pending:
+      (priority, prioqueue) = self.__pending[0]
+
+      if not prioqueue:
+        heapq.heappop(self.__pending)
+        del self.__pending_by_prio[priority]
+        assert priority not in self.__pending_shared
+        continue
+
+      if prioqueue:
+        return prioqueue
+
+    return None
+
   def __is_on_top(self, cond):
     """Checks whether the passed condition is on top of the queue.
 
     The caller must make sure the queue isn't empty.
 
     """
-    return self.__pending[0] == cond
+    return cond == self.__find_first_pending_queue()[0]
 
-  def __acquire_unlocked(self, shared, timeout):
+  def __acquire_unlocked(self, shared, timeout, priority):
     """Acquire a shared lock.
 
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @param timeout: maximum waiting time before giving up
+    @type priority: integer
+    @param priority: Priority for acquiring lock
 
     """
     self.__check_deleted()
 
     # We cannot acquire the lock if we already have it
-    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
+                                   " %s" % self.name)
+
+    # Remove empty entries from queue
+    self.__find_first_pending_queue()
 
     # Check whether someone else holds the lock or there are pending acquires.
     if not self.__pending and self.__can_acquire(shared):
@@ -507,20 +695,37 @@ class SharedLock(object):
       self.__do_acquire(shared)
       return True
 
-    if shared:
-      wait_condition = self.__active_shr_c
+    prioqueue = self.__pending_by_prio.get(priority, None)
 
-      # Check if we're not yet in the queue
-      if wait_condition not in self.__pending:
-        self.__pending.append(wait_condition)
+    if shared:
+      # Try to re-use condition for shared acquire
+      wait_condition = self.__pending_shared.get(priority, None)
+      assert (wait_condition is None or
+              (wait_condition.shared and wait_condition in prioqueue))
     else:
-      wait_condition = self.__condition_class(self.__lock)
-      # Always add to queue
-      self.__pending.append(wait_condition)
+      wait_condition = None
+
+    if wait_condition is None:
+      if prioqueue is None:
+        assert priority not in self.__pending_by_prio
+
+        prioqueue = []
+        heapq.heappush(self.__pending, (priority, prioqueue))
+        self.__pending_by_prio[priority] = prioqueue
+
+      wait_condition = self.__condition_class(self.__lock, shared)
+      prioqueue.append(wait_condition)
+
+      if shared:
+        # Keep reference for further shared acquires on same priority. This is
+        # better than trying to find it in the list of pending acquires.
+        assert priority not in self.__pending_shared
+        self.__pending_shared[priority] = wait_condition
 
     try:
       # Wait until we become the topmost acquire in the queue or the timeout
       # expires.
+      # TODO: Decrease timeout with spurious notifications
       while not (self.__is_on_top(wait_condition) and
                  self.__can_acquire(shared)):
         # Wait for notification
@@ -537,30 +742,38 @@ class SharedLock(object):
         return True
     finally:
       # Remove condition from queue if there are no more waiters
-      if not wait_condition.has_waiting() and not self.__deleted:
-        self.__pending.remove(wait_condition)
+      if not wait_condition.has_waiting():
+        prioqueue.remove(wait_condition)
+        if wait_condition.shared:
+          del self.__pending_shared[priority]
 
     return False
 
-  def acquire(self, shared=0, timeout=None, test_notify=None):
+  def acquire(self, shared=0, timeout=None, priority=None,
+              test_notify=None):
     """Acquire a shared lock.
 
-    @type shared: int
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float
     @param timeout: maximum waiting time before giving up
+    @type priority: integer
+    @param priority: Priority for acquiring lock
     @type test_notify: callable or None
     @param test_notify: Special callback function for unittesting
 
     """
+    if priority is None:
+      priority = _DEFAULT_PRIORITY
+
     self.__lock.acquire()
     try:
       # We already got the lock, notify now
       if __debug__ and callable(test_notify):
         test_notify()
 
-      return self.__acquire_unlocked(shared, timeout)
+      return self.__acquire_unlocked(shared, timeout, priority)
     finally:
       self.__lock.release()
 
@@ -583,18 +796,14 @@ class SharedLock(object):
         self.__shr.remove(threading.currentThread())
 
       # Notify topmost condition in queue
-      if self.__pending:
-        first_condition = self.__pending[0]
-        first_condition.notifyAll()
-
-        if first_condition == self.__active_shr_c:
-          self.__active_shr_c = self.__inactive_shr_c
-          self.__inactive_shr_c = first_condition
+      prioqueue = self.__find_first_pending_queue()
+      if prioqueue:
+        prioqueue[0].notifyAll()
 
     finally:
       self.__lock.release()
 
-  def delete(self, timeout=None):
+  def delete(self, timeout=None, priority=None):
     """Delete a Shared Lock.
 
     This operation will declare the lock for removal. First the lock will be
@@ -603,8 +812,13 @@ class SharedLock(object):
 
     @type timeout: float
     @param timeout: maximum waiting time before giving up
+    @type priority: integer
+    @param priority: Priority for acquiring lock
 
     """
+    if priority is None:
+      priority = _DEFAULT_PRIORITY
+
     self.__lock.acquire()
     try:
       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
@@ -615,7 +829,7 @@ class SharedLock(object):
       acquired = self.__is_exclusive()
 
       if not acquired:
-        acquired = self.__acquire_unlocked(0, timeout)
+        acquired = self.__acquire_unlocked(0, timeout, priority)
 
         assert self.__is_exclusive() and not self.__is_sharer(), \
           "Lock wasn't acquired in exclusive mode"
@@ -624,14 +838,27 @@ class SharedLock(object):
         self.__deleted = True
         self.__exc = None
 
+        assert not (self.__exc or self.__shr), "Found owner during deletion"
+
         # Notify all acquires. They'll throw an error.
-        while self.__pending:
-          self.__pending.pop().notifyAll()
+        for (_, prioqueue) in self.__pending:
+          for cond in prioqueue:
+            cond.notifyAll()
+
+        assert self.__deleted
 
       return acquired
     finally:
       self.__lock.release()
 
+  def _release_save(self):
+    shared = self.__is_sharer()
+    self.release()
+    return shared
+
+  def _acquire_restore(self, shared):
+    self.acquire(shared=shared)
+
 
 # Whenever we want to acquire a full LockSet we pass None as the value
 # to acquire.  Hide this behind this nicely named constant.
@@ -654,23 +881,35 @@ class LockSet:
 
   All the locks needed in the same set must be acquired together, though.
 
+  @type name: string
+  @ivar name: the name of the lockset
+
   """
-  def __init__(self, members=None):
+  def __init__(self, members, name, monitor=None):
     """Constructs a new LockSet.
 
+    @type members: list of strings
     @param members: initial members of the set
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register member locks
 
     """
+    assert members is not None, "members parameter is not a list"
+    self.name = name
+
+    # Lock monitor
+    self.__monitor = monitor
+
     # Used internally to guarantee coherency.
-    self.__lock = SharedLock()
+    self.__lock = SharedLock(name)
 
     # The lockdict indexes the relationship name -> lock
     # The order-of-locking is implied by the alphabetical order of names
     self.__lockdict = {}
 
-    if members is not None:
-      for name in members:
-        self.__lockdict[name] = SharedLock()
+    for mname in members:
+      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
+                                          monitor=monitor)
 
     # The owner dict contains the set of locks each thread owns. For
     # performance each thread can access its own key without a global lock on
@@ -681,6 +920,12 @@ class LockSet:
     # will be trouble.
     self.__owners = {}
 
+  def _GetLockName(self, mname):
+    """Returns the name for a member lock.
+
+    """
+    return "%s/%s" % (self.name, mname)
+
   def _is_owned(self):
     """Is the current thread a current level owner?"""
     return threading.currentThread() in self.__owners
@@ -720,7 +965,9 @@ class LockSet:
   def _release_and_delete_owned(self):
     """Release and delete all resources owned by the current thread"""
     for lname in self._list_owned():
-      self.__lockdict[lname].release()
+      lock = self.__lockdict[lname]
+      if lock._is_owned():
+        lock.release()
       self._del_owned(name=lname)
 
   def __names(self):
@@ -751,15 +998,20 @@ class LockSet:
         self.__lock.release()
     return set(result)
 
-  def acquire(self, names, timeout=None, shared=0, test_notify=None):
+  def acquire(self, names, timeout=None, shared=0, priority=None,
+              test_notify=None):
     """Acquire a set of resource locks.
 
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks
+    @type priority: integer
+    @param priority: Priority for acquiring locks
     @type test_notify: callable or None
     @param test_notify: Special callback function for unittesting
 
@@ -773,28 +1025,24 @@ class LockSet:
     assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
-    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
+    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
+                                  " (lockset %s)" % self.name)
+
+    if priority is None:
+      priority = _DEFAULT_PRIORITY
 
     # We need to keep track of how long we spent waiting for a lock. The
     # timeout passed to this function is over all lock acquires.
-    remaining_timeout = timeout
-    if timeout is None:
-      start = None
-      calc_remaining_timeout = lambda: None
-    else:
-      start = time.time()
-      calc_remaining_timeout = lambda: (start + timeout) - time.time()
+    running_timeout = RunningTimeout(timeout, False)
 
     try:
       if names is not None:
         # Support passing in a single resource to acquire rather than many
         if isinstance(names, basestring):
           names = [names]
-        else:
-          names = sorted(names)
 
-        return self.__acquire_inner(names, False, shared,
-                                    calc_remaining_timeout, test_notify)
+        return self.__acquire_inner(names, False, shared, priority,
+                                    running_timeout.Remaining, test_notify)
 
       else:
         # If no names are given acquire the whole set by not letting new names
@@ -802,19 +1050,19 @@ class LockSet:
         # Some of them may then be deleted later, but we'll cope with this.
         #
         # We'd like to acquire this lock in a shared way, as it's nice if
-        # everybody else can use the instances at the same time. If are
+        # everybody else can use the instances at the same time. If we are
         # acquiring them exclusively though they won't be able to do this
         # anyway, though, so we'll get the list lock exclusively as well in
         # order to be able to do add() on the set while owning it.
-        if not self.__lock.acquire(shared=shared,
-                                   timeout=calc_remaining_timeout()):
+        if not self.__lock.acquire(shared=shared, priority=priority,
+                                   timeout=running_timeout.Remaining()):
           raise _AcquireTimeout()
         try:
           # note we own the set-lock
           self._add_owned()
 
-          return self.__acquire_inner(self.__names(), True, shared,
-                                      calc_remaining_timeout, test_notify)
+          return self.__acquire_inner(self.__names(), True, shared, priority,
+                                      running_timeout.Remaining, test_notify)
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
@@ -826,26 +1074,37 @@ class LockSet:
     except _AcquireTimeout:
       return None
 
-  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
-    """
+  def __acquire_inner(self, names, want_all, shared, priority,
+                      timeout_fn, test_notify):
+    """Inner logic for acquiring a number of locks.
+
+    @param names: Names of the locks to be acquired
+    @param want_all: Whether all locks in the set should be acquired
+    @param shared: Whether to acquire in shared mode
+    @param timeout_fn: Function returning remaining timeout
+    @param priority: Priority for acquiring locks
+    @param test_notify: Special callback function for unittesting
 
     """
     acquire_list = []
 
     # First we look the locks up on __lockdict. We have no way of being sure
     # they will still be there after, but this makes it a lot faster should
-    # just one of them be the already wrong
-    for lname in utils.UniqueSequence(names):
+    # just one of them be the already wrong. Using a sorted sequence to prevent
+    # deadlocks.
+    for lname in sorted(utils.UniqueSequence(names)):
       try:
         lock = self.__lockdict[lname] # raises KeyError if lock is not there
-        acquire_list.append((lname, lock))
       except KeyError:
         if want_all:
           # We are acquiring all the set, it doesn't matter if this particular
           # element is not there anymore.
           continue
 
-        raise errors.LockError("Non-existing lock in set (%s)" % lname)
+        raise errors.LockError("Non-existing lock %s in set %s" %
+                               (lname, self.name))
+
+      acquire_list.append((lname, lock))
 
     # This will hold the locknames we effectively acquired.
     acquired = set()
@@ -863,12 +1122,11 @@ class LockSet:
           test_notify_fn = None
 
         timeout = timeout_fn()
-        if timeout is not None and timeout < 0:
-          raise _AcquireTimeout()
 
         try:
           # raises LockError if the lock was deleted
           acq_success = lock.acquire(shared=shared, timeout=timeout,
+                                     priority=priority,
                                      test_notify=test_notify_fn)
         except errors.LockError:
           if want_all:
@@ -876,14 +1134,16 @@ class LockSet:
             # particular element is not there anymore.
             continue
 
-          raise errors.LockError("Non-existing lock in set (%s)" % lname)
+          raise errors.LockError("Non-existing lock %s in set %s" %
+                                 (lname, self.name))
 
         if not acq_success:
           # Couldn't get lock or timeout occurred
           if timeout is None:
             # This shouldn't happen as SharedLock.acquire(timeout=None) is
             # blocking.
-            raise errors.LockError("Failed to get lock %s" % lname)
+            raise errors.LockError("Failed to get lock %s (set %s)" %
+                                   (lname, self.name))
 
           raise _AcquireTimeout()
 
@@ -913,11 +1173,13 @@ class LockSet:
     You must have acquired the locks, either in shared or in exclusive mode,
     before releasing them.
 
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level).
 
     """
-    assert self._is_owned(), "release() on lock set while not owner"
+    assert self._is_owned(), ("release() on lock set %s while not owner" %
+                              self.name)
 
     # Support passing in a single resource to release rather than many
     if isinstance(names, basestring):
@@ -928,8 +1190,8 @@ class LockSet:
     else:
       names = set(names)
       assert self._list_owned().issuperset(names), (
-               "release() on unheld resources %s" %
-               names.difference(self._list_owned()))
+               "release() on unheld resources %s (set %s)" %
+               (names.difference(self._list_owned()), self.name))
 
     # First of all let's release the "all elements" lock, if set.
     # After this 'add' can work again
@@ -946,14 +1208,18 @@ class LockSet:
   def add(self, names, acquired=0, shared=0):
     """Add a new set of elements to the set
 
+    @type names: list of strings
     @param names: names of the new elements to add
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: pre-acquire the new resource?
+    @type shared: integer (0/1) used as a boolean
     @param shared: is the pre-acquisition shared?
 
     """
     # Check we don't already own locks at this level
     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
-      "Cannot add locks if the set is only partially owned, or shared"
+      ("Cannot add locks if the set %s is only partially owned, or shared" %
+       self.name)
 
     # Support passing in a single resource to add rather than many
     if isinstance(names, basestring):
@@ -972,12 +1238,15 @@ class LockSet:
         # This must be an explicit raise, not an assert, because assert is
         # turned off when using optimization, and this can happen because of
         # concurrency even if the user doesn't want it.
-        raise errors.LockError("duplicate add() (%s)" % invalid_names)
+        raise errors.LockError("duplicate add(%s) on lockset %s" %
+                               (invalid_names, self.name))
 
       for lockname in names:
-        lock = SharedLock()
+        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
 
         if acquired:
+          # No need for priority or timeout here as this lock has just been
+          # created
           lock.acquire(shared=shared)
           # now the lock cannot be deleted, we have it!
           try:
@@ -1007,6 +1276,7 @@ class LockSet:
     You can either not hold anything in the lockset or already hold a superset
     of the elements you want to delete, exclusively.
 
+    @type names: list of strings
     @param names: names of the resource to remove.
 
     @return: a list of locks which we removed; the list is always
@@ -1022,7 +1292,8 @@ class LockSet:
     # to delete. The ownership must also be exclusive, but that will be checked
     # by the lock itself.
     assert not self._is_owned() or self._list_owned().issuperset(names), (
-      "remove() on acquired lockset while not owning all elements")
+      "remove() on acquired lockset %s while not owning all elements" %
+      self.name)
 
     removed = []
 
@@ -1037,7 +1308,8 @@ class LockSet:
         removed.append(lname)
       except (KeyError, errors.LockError):
         # This cannot happen if we were already holding it, verify:
-        assert not self._is_owned(), "remove failed while holding lockset"
+        assert not self._is_owned(), ("remove failed while holding lockset %s"
+                                      % self.name)
       else:
         # If no LockError was raised we are the ones who deleted the lock.
         # This means we can safely remove it from lockdict, as any further or
@@ -1110,13 +1382,24 @@ class GanetiLockManager:
 
     self.__class__._instance = self
 
+    self._monitor = LockMonitor()
+
     # The keyring contains all the locks, at their level and in the correct
     # locking order.
     self.__keyring = {
-      LEVEL_CLUSTER: LockSet([BGL]),
-      LEVEL_NODE: LockSet(nodes),
-      LEVEL_INSTANCE: LockSet(instances),
-    }
+      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
+      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+      LEVEL_INSTANCE: LockSet(instances, "instances",
+                              monitor=self._monitor),
+      }
+
+  def QueryLocks(self, fields, sync):
+    """Queries information from all locks.
+
+    See L{LockMonitor.QueryLocks}.
+
+    """
+    return self._monitor.QueryLocks(fields, sync)
 
   def _names(self, level):
     """List the lock names at the given level.
@@ -1149,9 +1432,9 @@ class GanetiLockManager:
     """
     # This way of checking only works if LEVELS[i] = i, which we check for in
     # the test cases.
-    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
+    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
 
-  def _BGL_owned(self):
+  def _BGL_owned(self): # pylint: disable-msg=C0103
     """Check if the current thread owns the BGL.
 
     Both an exclusive or a shared acquisition work.
@@ -1159,7 +1442,8 @@ class GanetiLockManager:
     """
     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
 
-  def _contains_BGL(self, level, names):
+  @staticmethod
+  def _contains_BGL(level, names): # pylint: disable-msg=C0103
     """Check if the level contains the BGL.
 
     Check if acting on the given level and set of names will change
@@ -1168,17 +1452,21 @@ class GanetiLockManager:
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
-  def acquire(self, level, names, timeout=None, shared=0):
+  def acquire(self, level, names, timeout=None, shared=0, priority=None):
     """Acquire a set of resource locks, at the same level.
 
-    @param level: the level at which the locks shall be acquired;
-        it must be a member of LEVELS.
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be acquired
+    @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether to acquire in shared mode; by default
         an exclusive lock will be acquired
     @type timeout: float
     @param timeout: Maximum time to acquire all locks
+    @type priority: integer
+    @param priority: Priority for acquiring lock
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -1197,7 +1485,8 @@ class GanetiLockManager:
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
-    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
+    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
+                                         priority=priority)
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
@@ -1205,8 +1494,9 @@ class GanetiLockManager:
     You must have acquired the locks, either in shared or in exclusive
     mode, before releasing them.
 
-    @param level: the level at which the locks shall be released;
-        it must be a member of LEVELS
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be released
+    @type names: list of strings, or None
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level)
 
@@ -1216,8 +1506,8 @@ class GanetiLockManager:
             not self._upper_owned(LEVEL_CLUSTER)), (
             "Cannot release the Big Ganeti Lock while holding something"
             " at upper levels (%r)" %
-            (", ".join(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
-                        for i in self.__keyring.keys()]), ))
+            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
+                              for i in self.__keyring.keys()]), ))
 
     # Release will complain if we don't own the locks already
     return self.__keyring[level].release(names)
@@ -1225,10 +1515,13 @@ class GanetiLockManager:
   def add(self, level, names, acquired=0, shared=0):
     """Add locks at the specified level.
 
-    @param level: the level at which the locks shall be added;
-        it must be a member of LEVELS_MOD.
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be added
+    @type names: list of strings
     @param names: names of the locks to acquire
+    @type acquired: integer (0/1) used as a boolean
     @param acquired: whether to acquire the newly added locks
+    @type shared: integer (0/1) used as a boolean
     @param shared: whether the acquisition will be shared
 
     """
@@ -1245,8 +1538,9 @@ class GanetiLockManager:
     You must either already own the locks you are trying to remove
     exclusively or not own any lock at an upper level.
 
-    @param level: the level at which the locks shall be removed;
-        it must be a member of LEVELS_MOD
+    @type level: member of locking.LEVELS_MOD
+    @param level: the level at which the locks shall be removed
+    @type names: list of strings
     @param names: the names of the locks which shall be removed
         (special lock names, or instance/node names)
 
@@ -1261,3 +1555,59 @@ class GanetiLockManager:
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
     return self.__keyring[level].remove(names)
+
+
+class LockMonitor(object):
+  _LOCK_ATTR = "_lock"
+
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    self._lock = SharedLock("LockMonitor")
+
+    # Tracked locks. Weak references are used to avoid issues with circular
+    # references and deletion.
+    self._locks = weakref.WeakKeyDictionary()
+
+  @ssynchronized(_LOCK_ATTR)
+  def RegisterLock(self, lock):
+    """Registers a new lock.
+
+    """
+    logging.debug("Registering lock %s", lock.name)
+    assert lock not in self._locks, "Duplicate lock registration"
+    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
+           "Found duplicate lock name"
+    self._locks[lock] = None
+
+  @ssynchronized(_LOCK_ATTR)
+  def _GetLockInfo(self, fields):
+    """Get information from all locks while the monitor lock is held.
+
+    """
+    result = {}
+
+    for lock in self._locks.keys():
+      assert lock.name not in result, "Found duplicate lock name"
+      result[lock.name] = lock.GetInfo(fields)
+
+    return result
+
+  def QueryLocks(self, fields, sync):
+    """Queries information from all locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+    @type sync: boolean
+    @param sync: Whether to operate in synchronous mode
+
+    """
+    if sync:
+      raise NotImplementedError("Synchronous queries are not implemented")
+
+    # Get all data without sorting
+    result = self._GetLockInfo(fields)
+
+    # Sort by name
+    return [result[name] for name in utils.NiceSort(result.keys())]