utils.log: Split formatter building into separate function
[ganeti-local] / lib / locking.py
index 3e25917..3575085 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 import os
 import select
 import threading
-import time
 import errno
+import weakref
+import logging
+import heapq
+import operator
 
 from ganeti import errors
 from ganeti import utils
 from ganeti import compat
+from ganeti import query
 
 
-def ssynchronized(lock, shared=0):
+_EXCLUSIVE_TEXT = "exclusive"
+_SHARED_TEXT = "shared"
+_DELETED_TEXT = "deleted"
+
+_DEFAULT_PRIORITY = 0
+
+
+def ssynchronized(mylock, shared=0):
   """Shared Synchronization decorator.
 
   Calls the function holding the given lock, either in exclusive or shared
   mode. It requires the passed lock to be a SharedLock (or support its
   semantics).
 
+  @type mylock: lockable object or string
+  @param mylock: lock to acquire or class member name of the lock to acquire
+
   """
   def wrap(fn):
     def sync_function(*args, **kwargs):
+      if isinstance(mylock, basestring):
+        assert args, "cannot ssynchronize on non-class method: self not found"
+        # args[0] is "self"
+        lock = getattr(args[0], mylock)
+      else:
+        lock = mylock
       lock.acquire(shared=shared)
       try:
         return fn(*args, **kwargs)
@@ -55,59 +75,6 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
-class RunningTimeout(object):
-  """Class to calculate remaining timeout when doing several operations.
-
-  """
-  __slots__ = [
-    "_allow_negative",
-    "_start_time",
-    "_time_fn",
-    "_timeout",
-    ]
-
-  def __init__(self, timeout, allow_negative, _time_fn=time.time):
-    """Initializes this class.
-
-    @type timeout: float
-    @param timeout: Timeout duration
-    @type allow_negative: bool
-    @param allow_negative: Whether to return values below zero
-    @param _time_fn: Time function for unittests
-
-    """
-    object.__init__(self)
-
-    if timeout is not None and timeout < 0.0:
-      raise ValueError("Timeout must not be negative")
-
-    self._timeout = timeout
-    self._allow_negative = allow_negative
-    self._time_fn = _time_fn
-
-    self._start_time = None
-
-  def Remaining(self):
-    """Returns the remaining timeout.
-
-    """
-    if self._timeout is None:
-      return None
-
-    # Get start time on first calculation
-    if self._start_time is None:
-      self._start_time = self._time_fn()
-
-    # Calculate remaining time
-    remaining_timeout = self._start_time + self._timeout - self._time_fn()
-
-    if not self._allow_negative:
-      # Ensure timeout is always >= 0
-      return max(0.0, remaining_timeout)
-
-    return remaining_timeout
-
-
 class _SingleNotifyPipeConditionWaiter(object):
   """Helper class for SingleNotifyPipeCondition
 
@@ -137,7 +104,7 @@ class _SingleNotifyPipeConditionWaiter(object):
     @param timeout: Timeout for waiting (can be None)
 
     """
-    running_timeout = RunningTimeout(timeout, True)
+    running_timeout = utils.RunningTimeout(timeout, True)
 
     while True:
       remaining_time = running_timeout.Remaining()
@@ -171,6 +138,9 @@ class _BaseCondition(object):
     "_lock",
     "acquire",
     "release",
+    "_is_owned",
+    "_acquire_restore",
+    "_release_save",
     ]
 
   def __init__(self, lock):
@@ -182,9 +152,18 @@ class _BaseCondition(object):
     """
     object.__init__(self)
 
-    # Recursive locks are not supported
-    assert not hasattr(lock, "_acquire_restore")
-    assert not hasattr(lock, "_release_save")
+    try:
+      self._release_save = lock._release_save
+    except AttributeError:
+      self._release_save = self._base_release_save
+    try:
+      self._acquire_restore = lock._acquire_restore
+    except AttributeError:
+      self._acquire_restore = self._base_acquire_restore
+    try:
+      self._is_owned = lock._is_owned
+    except AttributeError:
+      self._is_owned = self._base_is_owned
 
     self._lock = lock
 
@@ -192,16 +171,21 @@ class _BaseCondition(object):
     self.acquire = lock.acquire
     self.release = lock.release
 
-  def _is_owned(self):
+  def _base_is_owned(self):
     """Check whether lock is owned by current thread.
 
     """
     if self._lock.acquire(0):
       self._lock.release()
       return False
-
     return True
 
+  def _base_release_save(self):
+    self._lock.release()
+
+  def _base_acquire_restore(self, _):
+    self._lock.acquire()
+
   def _check_owned(self):
     """Raise an exception if the current thread doesn't own the lock.
 
@@ -280,13 +264,13 @@ class SingleNotifyPipeCondition(_BaseCondition):
         self._poller.register(self._read_fd, select.POLLHUP)
 
       wait_fn = self._waiter_class(self._poller, self._read_fd)
-      self.release()
+      state = self._release_save()
       try:
         # Wait for notification
         wait_fn(timeout)
       finally:
         # Re-acquire lock
-        self.acquire()
+        self._acquire_restore(state)
     finally:
       self._nwaiters -= 1
       if self._nwaiters == 0:
@@ -315,7 +299,7 @@ class PipeCondition(_BaseCondition):
 
   """
   __slots__ = [
-    "_nwaiters",
+    "_waiters",
     "_single_condition",
     ]
 
@@ -326,7 +310,7 @@ class PipeCondition(_BaseCondition):
 
     """
     _BaseCondition.__init__(self, lock)
-    self._nwaiters = 0
+    self._waiters = set()
     self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
@@ -340,15 +324,14 @@ class PipeCondition(_BaseCondition):
 
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
-    my_condition = self._single_condition
+    cond = self._single_condition
 
-    assert self._nwaiters >= 0
-    self._nwaiters += 1
+    self._waiters.add(threading.currentThread())
     try:
-      my_condition.wait(timeout)
+      cond.wait(timeout)
     finally:
-      assert self._nwaiters > 0
-      self._nwaiters -= 1
+      self._check_owned()
+      self._waiters.remove(threading.currentThread())
 
   def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
@@ -358,102 +341,88 @@ class PipeCondition(_BaseCondition):
     self._single_condition.notifyAll()
     self._single_condition = self._single_condition_class(self._lock)
 
-  def has_waiting(self):
-    """Returns whether there are active waiters.
+  def get_waiting(self):
+    """Returns a list of all waiting threads.
 
     """
     self._check_owned()
 
-    return bool(self._nwaiters)
+    return self._waiters
 
+  def has_waiting(self):
+    """Returns whether there are active waiters.
+
+    """
+    self._check_owned()
 
-class _CountingCondition(object):
-  """Wrapper for Python's built-in threading.Condition class.
+    return bool(self._waiters)
 
-  This wrapper keeps a count of active waiters. We can't access the internal
-  "__waiters" attribute of threading.Condition because it's not thread-safe.
 
-  """
+class _PipeConditionWithMode(PipeCondition):
   __slots__ = [
-    "_cond",
-    "_nwaiters",
+    "shared",
     ]
 
-  def __init__(self, lock):
+  def __init__(self, lock, shared):
     """Initializes this class.
 
     """
-    object.__init__(self)
-    self._cond = threading.Condition(lock=lock)
-    self._nwaiters = 0
-
-  def notifyAll(self): # pylint: disable-msg=C0103
-    """Notifies the condition.
-
-    """
-    return self._cond.notifyAll()
-
-  def wait(self, timeout=None):
-    """Waits for the condition to be notified.
-
-    @type timeout: float or None
-    @param timeout: Waiting timeout (can be None)
-
-    """
-    assert self._nwaiters >= 0
-
-    self._nwaiters += 1
-    try:
-      return self._cond.wait(timeout=timeout)
-    finally:
-      self._nwaiters -= 1
-
-  def has_waiting(self):
-    """Returns whether there are active waiters.
-
-    """
-    return bool(self._nwaiters)
+    self.shared = shared
+    PipeCondition.__init__(self, lock)
 
 
 class SharedLock(object):
   """Implements a shared lock.
 
-  Multiple threads can acquire the lock in a shared way, calling
-  acquire_shared().  In order to acquire the lock in an exclusive way threads
-  can call acquire_exclusive().
+  Multiple threads can acquire the lock in a shared way by calling
+  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
+  threads can call C{acquire(shared=0)}.
 
-  The lock prevents starvation but does not guarantee that threads will acquire
-  the shared lock in the order they queued for it, just that they will
-  eventually do so.
+  Notes on data structures: C{__pending} contains a priority queue (heapq) of
+  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
+  ...]}. Each per-priority queue contains a normal in-order list of conditions
+  to be notified when the lock can be acquired. Shared locks are grouped
+  together by priority and the condition for them is stored in
+  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
+  references for the per-priority queues indexed by priority for faster access.
+
+  @type name: string
+  @ivar name: the name of the lock
 
   """
   __slots__ = [
-    "__active_shr_c",
-    "__inactive_shr_c",
+    "__weakref__",
     "__deleted",
     "__exc",
     "__lock",
     "__pending",
+    "__pending_by_prio",
+    "__pending_shared",
     "__shr",
+    "name",
     ]
 
-  __condition_class = PipeCondition
+  __condition_class = _PipeConditionWithMode
 
-  def __init__(self):
+  def __init__(self, name, monitor=None):
     """Construct a new SharedLock.
 
+    @param name: the name of the lock
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register
+
     """
     object.__init__(self)
 
+    self.name = name
+
     # Internal lock
     self.__lock = threading.Lock()
 
     # Queue containing waiting acquires
     self.__pending = []
-
-    # Active and inactive conditions for shared locks
-    self.__active_shr_c = self.__condition_class(self.__lock)
-    self.__inactive_shr_c = self.__condition_class(self.__lock)
+    self.__pending_by_prio = {}
+    self.__pending_shared = {}
 
     # Current lock holders
     self.__shr = set()
@@ -462,12 +431,73 @@ class SharedLock(object):
     # is this lock in the deleted state?
     self.__deleted = False
 
+    # Register with lock monitor
+    if monitor:
+      monitor.RegisterLock(self)
+
+  def GetInfo(self, requested):
+    """Retrieves information for querying locks.
+
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
+
+    """
+    self.__lock.acquire()
+    try:
+      # Note: to avoid unintentional race conditions, no references to
+      # modifiable objects should be returned unless they were created in this
+      # function.
+      mode = None
+      owner_names = None
+
+      if query.LQ_MODE in requested:
+        if self.__deleted:
+          mode = _DELETED_TEXT
+          assert not (self.__exc or self.__shr)
+        elif self.__exc:
+          mode = _EXCLUSIVE_TEXT
+        elif self.__shr:
+          mode = _SHARED_TEXT
+
+      # Current owner(s) are wanted
+      if query.LQ_OWNER in requested:
+        if self.__exc:
+          owner = [self.__exc]
+        else:
+          owner = self.__shr
+
+        if owner:
+          assert not self.__deleted
+          owner_names = [i.getName() for i in owner]
+
+      # Pending acquires are wanted
+      if query.LQ_PENDING in requested:
+        pending = []
+
+        # Sorting instead of copying and using heaq functions for simplicity
+        for (_, prioqueue) in sorted(self.__pending):
+          for cond in prioqueue:
+            if cond.shared:
+              pendmode = _SHARED_TEXT
+            else:
+              pendmode = _EXCLUSIVE_TEXT
+
+            # List of names will be sorted in L{query._GetLockPending}
+            pending.append((pendmode, [i.getName()
+                                       for i in cond.get_waiting()]))
+      else:
+        pending = None
+
+      return (self.name, mode, owner_names, pending)
+    finally:
+      self.__lock.release()
+
   def __check_deleted(self):
     """Raises an exception if the lock has been deleted.
 
     """
     if self.__deleted:
-      raise errors.LockError("Deleted lock")
+      raise errors.LockError("Deleted lock %s" % self.name)
 
   def __is_sharer(self):
     """Is the current thread sharing the lock at this time?
@@ -518,7 +548,23 @@ class SharedLock(object):
     """
     self.__lock.acquire()
     try:
-      return len(self.__pending)
+      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
+    finally:
+      self.__lock.release()
+
+  def _check_empty(self):
+    """Checks whether there are any pending acquires.
+
+    @rtype: bool
+
+    """
+    self.__lock.acquire()
+    try:
+      # Order is important: __find_first_pending_queue modifies __pending
+      return not (self.__find_first_pending_queue() or
+                  self.__pending or
+                  self.__pending_by_prio or
+                  self.__pending_shared)
     finally:
       self.__lock.release()
 
@@ -540,26 +586,52 @@ class SharedLock(object):
     else:
       return len(self.__shr) == 0 and self.__exc is None
 
+  def __find_first_pending_queue(self):
+    """Tries to find the topmost queued entry with pending acquires.
+
+    Removes empty entries while going through the list.
+
+    """
+    while self.__pending:
+      (priority, prioqueue) = self.__pending[0]
+
+      if not prioqueue:
+        heapq.heappop(self.__pending)
+        del self.__pending_by_prio[priority]
+        assert priority not in self.__pending_shared
+        continue
+
+      if prioqueue:
+        return prioqueue
+
+    return None
+
   def __is_on_top(self, cond):
     """Checks whether the passed condition is on top of the queue.
 
     The caller must make sure the queue isn't empty.
 
     """
-    return self.__pending[0] == cond
+    return cond == self.__find_first_pending_queue()[0]
 
-  def __acquire_unlocked(self, shared, timeout):
+  def __acquire_unlocked(self, shared, timeout, priority):
     """Acquire a shared lock.
 
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @param timeout: maximum waiting time before giving up
+    @type priority: integer
+    @param priority: Priority for acquiring lock
 
     """
     self.__check_deleted()
 
     # We cannot acquire the lock if we already have it
-    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
+                                   " %s" % self.name)
+
+    # Remove empty entries from queue
+    self.__find_first_pending_queue()
 
     # Check whether someone else holds the lock or there are pending acquires.
     if not self.__pending and self.__can_acquire(shared):
@@ -567,20 +639,37 @@ class SharedLock(object):
       self.__do_acquire(shared)
       return True
 
-    if shared:
-      wait_condition = self.__active_shr_c
+    prioqueue = self.__pending_by_prio.get(priority, None)
 
-      # Check if we're not yet in the queue
-      if wait_condition not in self.__pending:
-        self.__pending.append(wait_condition)
+    if shared:
+      # Try to re-use condition for shared acquire
+      wait_condition = self.__pending_shared.get(priority, None)
+      assert (wait_condition is None or
+              (wait_condition.shared and wait_condition in prioqueue))
     else:
-      wait_condition = self.__condition_class(self.__lock)
-      # Always add to queue
-      self.__pending.append(wait_condition)
+      wait_condition = None
+
+    if wait_condition is None:
+      if prioqueue is None:
+        assert priority not in self.__pending_by_prio
+
+        prioqueue = []
+        heapq.heappush(self.__pending, (priority, prioqueue))
+        self.__pending_by_prio[priority] = prioqueue
+
+      wait_condition = self.__condition_class(self.__lock, shared)
+      prioqueue.append(wait_condition)
+
+      if shared:
+        # Keep reference for further shared acquires on same priority. This is
+        # better than trying to find it in the list of pending acquires.
+        assert priority not in self.__pending_shared
+        self.__pending_shared[priority] = wait_condition
 
     try:
       # Wait until we become the topmost acquire in the queue or the timeout
       # expires.
+      # TODO: Decrease timeout with spurious notifications
       while not (self.__is_on_top(wait_condition) and
                  self.__can_acquire(shared)):
         # Wait for notification
@@ -597,12 +686,15 @@ class SharedLock(object):
         return True
     finally:
       # Remove condition from queue if there are no more waiters
-      if not wait_condition.has_waiting() and not self.__deleted:
-        self.__pending.remove(wait_condition)
+      if not wait_condition.has_waiting():
+        prioqueue.remove(wait_condition)
+        if wait_condition.shared:
+          del self.__pending_shared[priority]
 
     return False
 
-  def acquire(self, shared=0, timeout=None, test_notify=None):
+  def acquire(self, shared=0, timeout=None, priority=None,
+              test_notify=None):
     """Acquire a shared lock.
 
     @type shared: integer (0/1) used as a boolean
@@ -610,17 +702,22 @@ class SharedLock(object):
         exclusive lock will be acquired
     @type timeout: float
     @param timeout: maximum waiting time before giving up
+    @type priority: integer
+    @param priority: Priority for acquiring lock
     @type test_notify: callable or None
     @param test_notify: Special callback function for unittesting
 
     """
+    if priority is None:
+      priority = _DEFAULT_PRIORITY
+
     self.__lock.acquire()
     try:
       # We already got the lock, notify now
       if __debug__ and callable(test_notify):
         test_notify()
 
-      return self.__acquire_unlocked(shared, timeout)
+      return self.__acquire_unlocked(shared, timeout, priority)
     finally:
       self.__lock.release()
 
@@ -643,18 +740,14 @@ class SharedLock(object):
         self.__shr.remove(threading.currentThread())
 
       # Notify topmost condition in queue
-      if self.__pending:
-        first_condition = self.__pending[0]
-        first_condition.notifyAll()
-
-        if first_condition == self.__active_shr_c:
-          self.__active_shr_c = self.__inactive_shr_c
-          self.__inactive_shr_c = first_condition
+      prioqueue = self.__find_first_pending_queue()
+      if prioqueue:
+        prioqueue[0].notifyAll()
 
     finally:
       self.__lock.release()
 
-  def delete(self, timeout=None):
+  def delete(self, timeout=None, priority=None):
     """Delete a Shared Lock.
 
     This operation will declare the lock for removal. First the lock will be
@@ -663,8 +756,13 @@ class SharedLock(object):
 
     @type timeout: float
     @param timeout: maximum waiting time before giving up
+    @type priority: integer
+    @param priority: Priority for acquiring lock
 
     """
+    if priority is None:
+      priority = _DEFAULT_PRIORITY
+
     self.__lock.acquire()
     try:
       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
@@ -675,7 +773,7 @@ class SharedLock(object):
       acquired = self.__is_exclusive()
 
       if not acquired:
-        acquired = self.__acquire_unlocked(0, timeout)
+        acquired = self.__acquire_unlocked(0, timeout, priority)
 
         assert self.__is_exclusive() and not self.__is_sharer(), \
           "Lock wasn't acquired in exclusive mode"
@@ -684,14 +782,27 @@ class SharedLock(object):
         self.__deleted = True
         self.__exc = None
 
+        assert not (self.__exc or self.__shr), "Found owner during deletion"
+
         # Notify all acquires. They'll throw an error.
-        while self.__pending:
-          self.__pending.pop().notifyAll()
+        for (_, prioqueue) in self.__pending:
+          for cond in prioqueue:
+            cond.notifyAll()
+
+        assert self.__deleted
 
       return acquired
     finally:
       self.__lock.release()
 
+  def _release_save(self):
+    shared = self.__is_sharer()
+    self.release()
+    return shared
+
+  def _acquire_restore(self, shared):
+    self.acquire(shared=shared)
+
 
 # Whenever we want to acquire a full LockSet we pass None as the value
 # to acquire.  Hide this behind this nicely named constant.
@@ -714,24 +825,35 @@ class LockSet:
 
   All the locks needed in the same set must be acquired together, though.
 
+  @type name: string
+  @ivar name: the name of the lockset
+
   """
-  def __init__(self, members=None):
+  def __init__(self, members, name, monitor=None):
     """Constructs a new LockSet.
 
     @type members: list of strings
     @param members: initial members of the set
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register member locks
 
     """
+    assert members is not None, "members parameter is not a list"
+    self.name = name
+
+    # Lock monitor
+    self.__monitor = monitor
+
     # Used internally to guarantee coherency.
-    self.__lock = SharedLock()
+    self.__lock = SharedLock(name)
 
     # The lockdict indexes the relationship name -> lock
     # The order-of-locking is implied by the alphabetical order of names
     self.__lockdict = {}
 
-    if members is not None:
-      for name in members:
-        self.__lockdict[name] = SharedLock()
+    for mname in members:
+      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
+                                          monitor=monitor)
 
     # The owner dict contains the set of locks each thread owns. For
     # performance each thread can access its own key without a global lock on
@@ -742,6 +864,12 @@ class LockSet:
     # will be trouble.
     self.__owners = {}
 
+  def _GetLockName(self, mname):
+    """Returns the name for a member lock.
+
+    """
+    return "%s/%s" % (self.name, mname)
+
   def _is_owned(self):
     """Is the current thread a current level owner?"""
     return threading.currentThread() in self.__owners
@@ -814,7 +942,8 @@ class LockSet:
         self.__lock.release()
     return set(result)
 
-  def acquire(self, names, timeout=None, shared=0, test_notify=None):
+  def acquire(self, names, timeout=None, shared=0, priority=None,
+              test_notify=None):
     """Acquire a set of resource locks.
 
     @type names: list of strings (or string)
@@ -825,6 +954,8 @@ class LockSet:
         exclusive lock will be acquired
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks
+    @type priority: integer
+    @param priority: Priority for acquiring locks
     @type test_notify: callable or None
     @param test_notify: Special callback function for unittesting
 
@@ -838,11 +969,15 @@ class LockSet:
     assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
-    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
+    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
+                                  " (lockset %s)" % self.name)
+
+    if priority is None:
+      priority = _DEFAULT_PRIORITY
 
     # We need to keep track of how long we spent waiting for a lock. The
     # timeout passed to this function is over all lock acquires.
-    running_timeout = RunningTimeout(timeout, False)
+    running_timeout = utils.RunningTimeout(timeout, False)
 
     try:
       if names is not None:
@@ -850,7 +985,7 @@ class LockSet:
         if isinstance(names, basestring):
           names = [names]
 
-        return self.__acquire_inner(names, False, shared,
+        return self.__acquire_inner(names, False, shared, priority,
                                     running_timeout.Remaining, test_notify)
 
       else:
@@ -859,18 +994,18 @@ class LockSet:
         # Some of them may then be deleted later, but we'll cope with this.
         #
         # We'd like to acquire this lock in a shared way, as it's nice if
-        # everybody else can use the instances at the same time. If are
+        # everybody else can use the instances at the same time. If we are
         # acquiring them exclusively though they won't be able to do this
         # anyway, though, so we'll get the list lock exclusively as well in
         # order to be able to do add() on the set while owning it.
-        if not self.__lock.acquire(shared=shared,
+        if not self.__lock.acquire(shared=shared, priority=priority,
                                    timeout=running_timeout.Remaining()):
           raise _AcquireTimeout()
         try:
           # note we own the set-lock
           self._add_owned()
 
-          return self.__acquire_inner(self.__names(), True, shared,
+          return self.__acquire_inner(self.__names(), True, shared, priority,
                                       running_timeout.Remaining, test_notify)
         except:
           # We shouldn't have problems adding the lock to the owners list, but
@@ -883,13 +1018,15 @@ class LockSet:
     except _AcquireTimeout:
       return None
 
-  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
+  def __acquire_inner(self, names, want_all, shared, priority,
+                      timeout_fn, test_notify):
     """Inner logic for acquiring a number of locks.
 
     @param names: Names of the locks to be acquired
     @param want_all: Whether all locks in the set should be acquired
     @param shared: Whether to acquire in shared mode
     @param timeout_fn: Function returning remaining timeout
+    @param priority: Priority for acquiring locks
     @param test_notify: Special callback function for unittesting
 
     """
@@ -908,7 +1045,8 @@ class LockSet:
           # element is not there anymore.
           continue
 
-        raise errors.LockError("Non-existing lock in set (%s)" % lname)
+        raise errors.LockError("Non-existing lock %s in set %s (it may have"
+                               " been removed)" % (lname, self.name))
 
       acquire_list.append((lname, lock))
 
@@ -932,6 +1070,7 @@ class LockSet:
         try:
           # raises LockError if the lock was deleted
           acq_success = lock.acquire(shared=shared, timeout=timeout,
+                                     priority=priority,
                                      test_notify=test_notify_fn)
         except errors.LockError:
           if want_all:
@@ -939,14 +1078,16 @@ class LockSet:
             # particular element is not there anymore.
             continue
 
-          raise errors.LockError("Non-existing lock in set (%s)" % lname)
+          raise errors.LockError("Non-existing lock %s in set %s (it may"
+                                 " have been removed)" % (lname, self.name))
 
         if not acq_success:
           # Couldn't get lock or timeout occurred
           if timeout is None:
             # This shouldn't happen as SharedLock.acquire(timeout=None) is
             # blocking.
-            raise errors.LockError("Failed to get lock %s" % lname)
+            raise errors.LockError("Failed to get lock %s (set %s)" %
+                                   (lname, self.name))
 
           raise _AcquireTimeout()
 
@@ -981,7 +1122,8 @@ class LockSet:
         (defaults to all the locks acquired at that level).
 
     """
-    assert self._is_owned(), "release() on lock set while not owner"
+    assert self._is_owned(), ("release() on lock set %s while not owner" %
+                              self.name)
 
     # Support passing in a single resource to release rather than many
     if isinstance(names, basestring):
@@ -992,8 +1134,8 @@ class LockSet:
     else:
       names = set(names)
       assert self._list_owned().issuperset(names), (
-               "release() on unheld resources %s" %
-               names.difference(self._list_owned()))
+               "release() on unheld resources %s (set %s)" %
+               (names.difference(self._list_owned()), self.name))
 
     # First of all let's release the "all elements" lock, if set.
     # After this 'add' can work again
@@ -1020,7 +1162,8 @@ class LockSet:
     """
     # Check we don't already own locks at this level
     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
-      "Cannot add locks if the set is only partially owned, or shared"
+      ("Cannot add locks if the set %s is only partially owned, or shared" %
+       self.name)
 
     # Support passing in a single resource to add rather than many
     if isinstance(names, basestring):
@@ -1039,12 +1182,15 @@ class LockSet:
         # This must be an explicit raise, not an assert, because assert is
         # turned off when using optimization, and this can happen because of
         # concurrency even if the user doesn't want it.
-        raise errors.LockError("duplicate add() (%s)" % invalid_names)
+        raise errors.LockError("duplicate add(%s) on lockset %s" %
+                               (invalid_names, self.name))
 
       for lockname in names:
-        lock = SharedLock()
+        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
 
         if acquired:
+          # No need for priority or timeout here as this lock has just been
+          # created
           lock.acquire(shared=shared)
           # now the lock cannot be deleted, we have it!
           try:
@@ -1090,7 +1236,8 @@ class LockSet:
     # to delete. The ownership must also be exclusive, but that will be checked
     # by the lock itself.
     assert not self._is_owned() or self._list_owned().issuperset(names), (
-      "remove() on acquired lockset while not owning all elements")
+      "remove() on acquired lockset %s while not owning all elements" %
+      self.name)
 
     removed = []
 
@@ -1105,7 +1252,8 @@ class LockSet:
         removed.append(lname)
       except (KeyError, errors.LockError):
         # This cannot happen if we were already holding it, verify:
-        assert not self._is_owned(), "remove failed while holding lockset"
+        assert not self._is_owned(), ("remove failed while holding lockset %s"
+                                      % self.name)
       else:
         # If no LockError was raised we are the ones who deleted the lock.
         # This means we can safely remove it from lockdict, as any further or
@@ -1133,18 +1281,21 @@ class LockSet:
 #   the same time.
 LEVEL_CLUSTER = 0
 LEVEL_INSTANCE = 1
-LEVEL_NODE = 2
+LEVEL_NODEGROUP = 2
+LEVEL_NODE = 3
 
 LEVELS = [LEVEL_CLUSTER,
           LEVEL_INSTANCE,
+          LEVEL_NODEGROUP,
           LEVEL_NODE]
 
 # Lock levels which are modifiable
-LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
+LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
 
 LEVEL_NAMES = {
   LEVEL_CLUSTER: "cluster",
   LEVEL_INSTANCE: "instance",
+  LEVEL_NODEGROUP: "nodegroup",
   LEVEL_NODE: "node",
   }
 
@@ -1163,13 +1314,14 @@ class GanetiLockManager:
   """
   _instance = None
 
-  def __init__(self, nodes=None, instances=None):
+  def __init__(self, nodes, nodegroups, instances):
     """Constructs a new GanetiLockManager object.
 
     There should be only a GanetiLockManager object at any time, so this
     function raises an error if this is not the case.
 
     @param nodes: list of node names
+    @param nodegroups: list of nodegroup uuids
     @param instances: list of instance names
 
     """
@@ -1178,13 +1330,33 @@ class GanetiLockManager:
 
     self.__class__._instance = self
 
+    self._monitor = LockMonitor()
+
     # The keyring contains all the locks, at their level and in the correct
     # locking order.
     self.__keyring = {
-      LEVEL_CLUSTER: LockSet([BGL]),
-      LEVEL_NODE: LockSet(nodes),
-      LEVEL_INSTANCE: LockSet(instances),
-    }
+      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
+      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
+      LEVEL_INSTANCE: LockSet(instances, "instances",
+                              monitor=self._monitor),
+      }
+
+  def QueryLocks(self, fields):
+    """Queries information from all locks.
+
+    See L{LockMonitor.QueryLocks}.
+
+    """
+    return self._monitor.QueryLocks(fields)
+
+  def OldStyleQueryLocks(self, fields):
+    """Queries information from all locks, returning old-style data.
+
+    See L{LockMonitor.OldStyleQueryLocks}.
+
+    """
+    return self._monitor.OldStyleQueryLocks(fields)
 
   def _names(self, level):
     """List the lock names at the given level.
@@ -1237,7 +1409,7 @@ class GanetiLockManager:
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
-  def acquire(self, level, names, timeout=None, shared=0):
+  def acquire(self, level, names, timeout=None, shared=0, priority=None):
     """Acquire a set of resource locks, at the same level.
 
     @type level: member of locking.LEVELS
@@ -1250,6 +1422,8 @@ class GanetiLockManager:
         an exclusive lock will be acquired
     @type timeout: float
     @param timeout: Maximum time to acquire all locks
+    @type priority: integer
+    @param priority: Priority for acquiring lock
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -1268,7 +1442,8 @@ class GanetiLockManager:
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
-    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
+    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
+                                         priority=priority)
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
@@ -1337,3 +1512,73 @@ class GanetiLockManager:
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
     return self.__keyring[level].remove(names)
+
+
+class LockMonitor(object):
+  _LOCK_ATTR = "_lock"
+
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    self._lock = SharedLock("LockMonitor")
+
+    # Tracked locks. Weak references are used to avoid issues with circular
+    # references and deletion.
+    self._locks = weakref.WeakKeyDictionary()
+
+  @ssynchronized(_LOCK_ATTR)
+  def RegisterLock(self, lock):
+    """Registers a new lock.
+
+    """
+    logging.debug("Registering lock %s", lock.name)
+    assert lock not in self._locks, "Duplicate lock registration"
+    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
+           "Found duplicate lock name"
+    self._locks[lock] = None
+
+  @ssynchronized(_LOCK_ATTR)
+  def _GetLockInfo(self, requested):
+    """Get information from all locks while the monitor lock is held.
+
+    """
+    return [lock.GetInfo(requested) for lock in self._locks.keys()]
+
+  def _Query(self, fields):
+    """Queries information from all locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    qobj = query.Query(query.LOCK_FIELDS, fields)
+
+    # Get all data and sort by name
+    lockinfo = utils.NiceSort(self._GetLockInfo(qobj.RequestedData()),
+                              key=operator.itemgetter(0))
+
+    return (qobj, query.LockQueryData(lockinfo))
+
+  def QueryLocks(self, fields):
+    """Queries information from all locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    (qobj, ctx) = self._Query(fields)
+
+    # Prepare query response
+    return query.GetQueryResponse(qobj, ctx)
+
+  def OldStyleQueryLocks(self, fields):
+    """Queries information from all locks, returning old-style data.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    (qobj, ctx) = self._Query(fields)
+
+    return qobj.OldStyleQuery(ctx)