Enable disk growth with exclusive storage
[ganeti-local] / lib / locking.py
index 9304cf9..7a23af6 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -20,7 +20,7 @@
 
 """Module implementing the Ganeti locking code."""
 
-# pylint: disable-msg=W0212
+# pylint: disable=W0212
 
 # W0212 since e.g. LockSet methods use (a lot) the internals of
 # SharedLock
 import os
 import select
 import threading
-import time
 import errno
 import weakref
 import logging
 import heapq
+import itertools
+import time
 
 from ganeti import errors
 from ganeti import utils
 from ganeti import compat
+from ganeti import query
 
 
 _EXCLUSIVE_TEXT = "exclusive"
 _SHARED_TEXT = "shared"
+_DELETED_TEXT = "deleted"
 
 _DEFAULT_PRIORITY = 0
 
+#: Minimum timeout required to consider scheduling a pending acquisition
+#: (seconds)
+_LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
+
+# Internal lock acquisition modes for L{LockSet}
+(_LS_ACQUIRE_EXACT,
+ _LS_ACQUIRE_ALL,
+ _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
+
+_LS_ACQUIRE_MODES = compat.UniqueFrozenset([
+  _LS_ACQUIRE_EXACT,
+  _LS_ACQUIRE_ALL,
+  _LS_ACQUIRE_OPPORTUNISTIC,
+  ])
+
 
 def ssynchronized(mylock, shared=0):
   """Shared Synchronization decorator.
@@ -73,59 +91,6 @@ def ssynchronized(mylock, shared=0):
   return wrap
 
 
-class RunningTimeout(object):
-  """Class to calculate remaining timeout when doing several operations.
-
-  """
-  __slots__ = [
-    "_allow_negative",
-    "_start_time",
-    "_time_fn",
-    "_timeout",
-    ]
-
-  def __init__(self, timeout, allow_negative, _time_fn=time.time):
-    """Initializes this class.
-
-    @type timeout: float
-    @param timeout: Timeout duration
-    @type allow_negative: bool
-    @param allow_negative: Whether to return values below zero
-    @param _time_fn: Time function for unittests
-
-    """
-    object.__init__(self)
-
-    if timeout is not None and timeout < 0.0:
-      raise ValueError("Timeout must not be negative")
-
-    self._timeout = timeout
-    self._allow_negative = allow_negative
-    self._time_fn = _time_fn
-
-    self._start_time = None
-
-  def Remaining(self):
-    """Returns the remaining timeout.
-
-    """
-    if self._timeout is None:
-      return None
-
-    # Get start time on first calculation
-    if self._start_time is None:
-      self._start_time = self._time_fn()
-
-    # Calculate remaining time
-    remaining_timeout = self._start_time + self._timeout - self._time_fn()
-
-    if not self._allow_negative:
-      # Ensure timeout is always >= 0
-      return max(0.0, remaining_timeout)
-
-    return remaining_timeout
-
-
 class _SingleNotifyPipeConditionWaiter(object):
   """Helper class for SingleNotifyPipeCondition
 
@@ -155,7 +120,7 @@ class _SingleNotifyPipeConditionWaiter(object):
     @param timeout: Timeout for waiting (can be None)
 
     """
-    running_timeout = RunningTimeout(timeout, True)
+    running_timeout = utils.RunningTimeout(timeout, True)
 
     while True:
       remaining_time = running_timeout.Remaining()
@@ -212,7 +177,7 @@ class _BaseCondition(object):
     except AttributeError:
       self._acquire_restore = self._base_acquire_restore
     try:
-      self._is_owned = lock._is_owned
+      self._is_owned = lock.is_owned
     except AttributeError:
       self._is_owned = self._base_is_owned
 
@@ -297,7 +262,7 @@ class SingleNotifyPipeCondition(_BaseCondition):
       self._write_fd = None
     self._poller = None
 
-  def wait(self, timeout=None):
+  def wait(self, timeout):
     """Wait for a notification.
 
     @type timeout: float or None
@@ -327,7 +292,7 @@ class SingleNotifyPipeCondition(_BaseCondition):
       if self._nwaiters == 0:
         self._Cleanup()
 
-  def notifyAll(self): # pylint: disable-msg=C0103
+  def notifyAll(self): # pylint: disable=C0103
     """Close the writing side of the pipe to notify all waiters.
 
     """
@@ -364,7 +329,7 @@ class PipeCondition(_BaseCondition):
     self._waiters = set()
     self._single_condition = self._single_condition_class(self._lock)
 
-  def wait(self, timeout=None):
+  def wait(self, timeout):
     """Wait for a notification.
 
     @type timeout: float or None
@@ -384,7 +349,7 @@ class PipeCondition(_BaseCondition):
       self._check_owned()
       self._waiters.remove(threading.currentThread())
 
-  def notifyAll(self): # pylint: disable-msg=C0103
+  def notifyAll(self): # pylint: disable=C0103
     """Notify all currently waiting threads.
 
     """
@@ -408,6 +373,11 @@ class PipeCondition(_BaseCondition):
 
     return bool(self._waiters)
 
+  def __repr__(self):
+    return ("<%s.%s waiters=%s at %#x>" %
+            (self.__class__.__module__, self.__class__.__name__,
+             self._waiters, id(self)))
+
 
 class _PipeConditionWithMode(PipeCondition):
   __slots__ = [
@@ -450,12 +420,13 @@ class SharedLock(object):
     "__pending_by_prio",
     "__pending_shared",
     "__shr",
+    "__time_fn",
     "name",
     ]
 
   __condition_class = _PipeConditionWithMode
 
-  def __init__(self, name, monitor=None):
+  def __init__(self, name, monitor=None, _time_fn=time.time):
     """Construct a new SharedLock.
 
     @param name: the name of the lock
@@ -467,6 +438,9 @@ class SharedLock(object):
 
     self.name = name
 
+    # Used for unittesting
+    self.__time_fn = _time_fn
+
     # Internal lock
     self.__lock = threading.Lock()
 
@@ -484,67 +458,68 @@ class SharedLock(object):
 
     # Register with lock monitor
     if monitor:
+      logging.debug("Adding lock %s to monitor", name)
       monitor.RegisterLock(self)
 
-  def GetInfo(self, fields):
+  def __repr__(self):
+    return ("<%s.%s name=%s at %#x>" %
+            (self.__class__.__module__, self.__class__.__name__,
+             self.name, id(self)))
+
+  def GetLockInfo(self, requested):
     """Retrieves information for querying locks.
 
-    @type fields: list of strings
-    @param fields: List of fields to return
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
 
     """
     self.__lock.acquire()
     try:
-      info = []
-
       # Note: to avoid unintentional race conditions, no references to
       # modifiable objects should be returned unless they were created in this
       # function.
-      for fname in fields:
-        if fname == "name":
-          info.append(self.name)
-        elif fname == "mode":
-          if self.__deleted:
-            info.append("deleted")
-            assert not (self.__exc or self.__shr)
-          elif self.__exc:
-            info.append(_EXCLUSIVE_TEXT)
-          elif self.__shr:
-            info.append(_SHARED_TEXT)
-          else:
-            info.append(None)
-        elif fname == "owner":
-          if self.__exc:
-            owner = [self.__exc]
-          else:
-            owner = self.__shr
-
-          if owner:
-            assert not self.__deleted
-            info.append([i.getName() for i in owner])
-          else:
-            info.append(None)
-        elif fname == "pending":
-          data = []
-
-          # Sorting instead of copying and using heaq functions for simplicity
-          for (_, prioqueue) in sorted(self.__pending):
-            for cond in prioqueue:
-              if cond.shared:
-                mode = _SHARED_TEXT
-              else:
-                mode = _EXCLUSIVE_TEXT
-
-              # This function should be fast as it runs with the lock held.
-              # Hence not using utils.NiceSort.
-              data.append((mode, sorted(i.getName()
-                                        for i in cond.get_waiting())))
-
-          info.append(data)
+      mode = None
+      owner_names = None
+
+      if query.LQ_MODE in requested:
+        if self.__deleted:
+          mode = _DELETED_TEXT
+          assert not (self.__exc or self.__shr)
+        elif self.__exc:
+          mode = _EXCLUSIVE_TEXT
+        elif self.__shr:
+          mode = _SHARED_TEXT
+
+      # Current owner(s) are wanted
+      if query.LQ_OWNER in requested:
+        if self.__exc:
+          owner = [self.__exc]
         else:
-          raise errors.OpExecError("Invalid query field '%s'" % fname)
+          owner = self.__shr
+
+        if owner:
+          assert not self.__deleted
+          owner_names = [i.getName() for i in owner]
+
+      # Pending acquires are wanted
+      if query.LQ_PENDING in requested:
+        pending = []
+
+        # Sorting instead of copying and using heaq functions for simplicity
+        for (_, prioqueue) in sorted(self.__pending):
+          for cond in prioqueue:
+            if cond.shared:
+              pendmode = _SHARED_TEXT
+            else:
+              pendmode = _EXCLUSIVE_TEXT
+
+            # List of names will be sorted in L{query._GetLockPending}
+            pending.append((pendmode, [i.getName()
+                                       for i in cond.get_waiting()]))
+      else:
+        pending = None
 
-      return info
+      return [(self.name, mode, owner_names, pending)]
     finally:
       self.__lock.release()
 
@@ -581,7 +556,7 @@ class SharedLock(object):
     else:
       return self.__is_exclusive()
 
-  def _is_owned(self, shared=-1):
+  def is_owned(self, shared=-1):
     """Is the current thread somehow owning the lock at this time?
 
     @param shared:
@@ -596,6 +571,10 @@ class SharedLock(object):
     finally:
       self.__lock.release()
 
+  #: Necessary to remain compatible with threading.Condition, which tries to
+  #: retrieve a locks' "_is_owned" attribute
+  _is_owned = is_owned
+
   def _count_pending(self):
     """Returns the number of pending acquires.
 
@@ -617,7 +596,9 @@ class SharedLock(object):
     self.__lock.acquire()
     try:
       # Order is important: __find_first_pending_queue modifies __pending
-      return not (self.__find_first_pending_queue() or
+      (_, prioqueue) = self.__find_first_pending_queue()
+
+      return not (prioqueue or
                   self.__pending or
                   self.__pending_by_prio or
                   self.__pending_shared)
@@ -651,16 +632,15 @@ class SharedLock(object):
     while self.__pending:
       (priority, prioqueue) = self.__pending[0]
 
-      if not prioqueue:
-        heapq.heappop(self.__pending)
-        del self.__pending_by_prio[priority]
-        assert priority not in self.__pending_shared
-        continue
-
       if prioqueue:
-        return prioqueue
+        return (priority, prioqueue)
 
-    return None
+      # Remove empty queue
+      heapq.heappop(self.__pending)
+      del self.__pending_by_prio[priority]
+      assert priority not in self.__pending_shared
+
+    return (None, None)
 
   def __is_on_top(self, cond):
     """Checks whether the passed condition is on top of the queue.
@@ -668,7 +648,9 @@ class SharedLock(object):
     The caller must make sure the queue isn't empty.
 
     """
-    return cond == self.__find_first_pending_queue()[0]
+    (_, prioqueue) = self.__find_first_pending_queue()
+
+    return cond == prioqueue[0]
 
   def __acquire_unlocked(self, shared, timeout, priority):
     """Acquire a shared lock.
@@ -695,6 +677,12 @@ class SharedLock(object):
       self.__do_acquire(shared)
       return True
 
+    # The lock couldn't be acquired right away, so if a timeout is given and is
+    # considered too short, return right away as scheduling a pending
+    # acquisition is quite expensive
+    if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
+      return False
+
     prioqueue = self.__pending_by_prio.get(priority, None)
 
     if shared:
@@ -722,32 +710,37 @@ class SharedLock(object):
         assert priority not in self.__pending_shared
         self.__pending_shared[priority] = wait_condition
 
+    wait_start = self.__time_fn()
+    acquired = False
+
     try:
       # Wait until we become the topmost acquire in the queue or the timeout
       # expires.
-      # TODO: Decrease timeout with spurious notifications
-      while not (self.__is_on_top(wait_condition) and
-                 self.__can_acquire(shared)):
-        # Wait for notification
-        wait_condition.wait(timeout)
-        self.__check_deleted()
+      while True:
+        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
+          self.__do_acquire(shared)
+          acquired = True
+          break
 
-        # A lot of code assumes blocking acquires always succeed. Loop
-        # internally for that case.
-        if timeout is not None:
+        # A lot of code assumes blocking acquires always succeed, therefore we
+        # can never return False for a blocking acquire
+        if (timeout is not None and
+            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
           break
 
-      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
-        self.__do_acquire(shared)
-        return True
+        # Wait for notification
+        wait_condition.wait(timeout)
+        self.__check_deleted()
     finally:
       # Remove condition from queue if there are no more waiters
       if not wait_condition.has_waiting():
         prioqueue.remove(wait_condition)
         if wait_condition.shared:
-          del self.__pending_shared[priority]
+          # Remove from list of shared acquires if it wasn't while releasing
+          # (e.g. on lock deletion)
+          self.__pending_shared.pop(priority, None)
 
-    return False
+    return acquired
 
   def acquire(self, shared=0, timeout=None, priority=None,
               test_notify=None):
@@ -777,6 +770,48 @@ class SharedLock(object):
     finally:
       self.__lock.release()
 
+  def downgrade(self):
+    """Changes the lock mode from exclusive to shared.
+
+    Pending acquires in shared mode on the same priority will go ahead.
+
+    """
+    self.__lock.acquire()
+    try:
+      assert self.__is_owned(), "Lock must be owned"
+
+      if self.__is_exclusive():
+        # Do nothing if the lock is already acquired in shared mode
+        self.__exc = None
+        self.__do_acquire(1)
+
+        # Important: pending shared acquires should only jump ahead if there
+        # was a transition from exclusive to shared, otherwise an owner of a
+        # shared lock can keep calling this function to push incoming shared
+        # acquires
+        (priority, prioqueue) = self.__find_first_pending_queue()
+        if prioqueue:
+          # Is there a pending shared acquire on this priority?
+          cond = self.__pending_shared.pop(priority, None)
+          if cond:
+            assert cond.shared
+            assert cond in prioqueue
+
+            # Ensure shared acquire is on top of queue
+            if len(prioqueue) > 1:
+              prioqueue.remove(cond)
+              prioqueue.insert(0, cond)
+
+            # Notify
+            cond.notifyAll()
+
+      assert not self.__is_exclusive()
+      assert self.__is_sharer()
+
+      return True
+    finally:
+      self.__lock.release()
+
   def release(self):
     """Release a Shared Lock.
 
@@ -792,14 +827,38 @@ class SharedLock(object):
       # Autodetect release type
       if self.__is_exclusive():
         self.__exc = None
+        notify = True
       else:
         self.__shr.remove(threading.currentThread())
+        notify = not self.__shr
 
-      # Notify topmost condition in queue
-      prioqueue = self.__find_first_pending_queue()
-      if prioqueue:
-        prioqueue[0].notifyAll()
+      # Notify topmost condition in queue if there are no owners left (for
+      # shared locks)
+      if notify:
+        self.__notify_topmost()
+    finally:
+      self.__lock.release()
+
+  def __notify_topmost(self):
+    """Notifies topmost condition in queue of pending acquires.
 
+    """
+    (priority, prioqueue) = self.__find_first_pending_queue()
+    if prioqueue:
+      cond = prioqueue[0]
+      cond.notifyAll()
+      if cond.shared:
+        # Prevent further shared acquires from sneaking in while waiters are
+        # notified
+        self.__pending_shared.pop(priority, None)
+
+  def _notify_topmost(self):
+    """Exported version of L{__notify_topmost}.
+
+    """
+    self.__lock.acquire()
+    try:
+      return self.__notify_topmost()
     finally:
       self.__lock.release()
 
@@ -831,10 +890,10 @@ class SharedLock(object):
       if not acquired:
         acquired = self.__acquire_unlocked(0, timeout, priority)
 
+      if acquired:
         assert self.__is_exclusive() and not self.__is_sharer(), \
           "Lock wasn't acquired in exclusive mode"
 
-      if acquired:
         self.__deleted = True
         self.__exc = None
 
@@ -865,6 +924,52 @@ class SharedLock(object):
 ALL_SET = None
 
 
+def _TimeoutZero():
+  """Returns the number zero.
+
+  """
+  return 0
+
+
+def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
+  """Determines modes and timeouts for L{LockSet.acquire}.
+
+  @type want_all: boolean
+  @param want_all: Whether all locks in set should be acquired
+  @param timeout: Timeout in seconds or C{None}
+  @param opportunistic: Whther locks should be acquired opportunistically
+  @rtype: tuple
+  @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
+    (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
+    acquiring the lockset-internal lock (might be C{None}) and a function to
+    calculate the timeout for acquiring individual locks
+
+  """
+  # Short circuit when no running timeout is needed
+  if opportunistic and not want_all:
+    assert timeout is None, "Got timeout for an opportunistic acquisition"
+    return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
+
+  # We need to keep track of how long we spent waiting for a lock. The
+  # timeout passed to this function is over all lock acquisitions.
+  running_timeout = utils.RunningTimeout(timeout, False)
+
+  if want_all:
+    mode = _LS_ACQUIRE_ALL
+    ls_timeout_fn = running_timeout.Remaining
+  else:
+    mode = _LS_ACQUIRE_EXACT
+    ls_timeout_fn = None
+
+  if opportunistic:
+    mode = _LS_ACQUIRE_OPPORTUNISTIC
+    timeout_fn = _TimeoutZero
+  else:
+    timeout_fn = running_timeout.Remaining
+
+  return (mode, ls_timeout_fn, timeout_fn)
+
+
 class _AcquireTimeout(Exception):
   """Internal exception to abort an acquire on a timeout.
 
@@ -900,8 +1005,8 @@ class LockSet:
     # Lock monitor
     self.__monitor = monitor
 
-    # Used internally to guarantee coherency.
-    self.__lock = SharedLock(name)
+    # Used internally to guarantee coherency
+    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
 
     # The lockdict indexes the relationship name -> lock
     # The order-of-locking is implied by the alphabetical order of names
@@ -926,17 +1031,80 @@ class LockSet:
     """
     return "%s/%s" % (self.name, mname)
 
-  def _is_owned(self):
-    """Is the current thread a current level owner?"""
+  def _get_lock(self):
+    """Returns the lockset-internal lock.
+
+    """
+    return self.__lock
+
+  def _get_lockdict(self):
+    """Returns the lockset-internal lock dictionary.
+
+    Accessing this structure is only safe in single-thread usage or when the
+    lockset-internal lock is held.
+
+    """
+    return self.__lockdict
+
+  def is_owned(self):
+    """Is the current thread a current level owner?
+
+    @note: Use L{check_owned} to check if a specific lock is held
+
+    """
     return threading.currentThread() in self.__owners
 
+  def check_owned(self, names, shared=-1):
+    """Check if locks are owned in a specific mode.
+
+    @type names: sequence or string
+    @param names: Lock names (or a single lock name)
+    @param shared: See L{SharedLock.is_owned}
+    @rtype: bool
+    @note: Use L{is_owned} to check if the current thread holds I{any} lock and
+      L{list_owned} to get the names of all owned locks
+
+    """
+    if isinstance(names, basestring):
+      names = [names]
+
+    # Avoid check if no locks are owned anyway
+    if names and self.is_owned():
+      candidates = []
+
+      # Gather references to all locks (in case they're deleted in the meantime)
+      for lname in names:
+        try:
+          lock = self.__lockdict[lname]
+        except KeyError:
+          raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
+                                 " have been removed)" % (lname, self.name))
+        else:
+          candidates.append(lock)
+
+      return compat.all(lock.is_owned(shared=shared) for lock in candidates)
+    else:
+      return False
+
+  def owning_all(self):
+    """Checks whether current thread owns internal lock.
+
+    Holding the internal lock is equivalent with holding all locks in the set
+    (the opposite does not necessarily hold as it can not be easily
+    determined). L{add} and L{remove} require the internal lock.
+
+    @rtype: boolean
+
+    """
+    return self.__lock.is_owned()
+
   def _add_owned(self, name=None):
     """Note the current thread owns the given lock"""
     if name is None:
-      if not self._is_owned():
+      if not self.is_owned():
         self.__owners[threading.currentThread()] = set()
     else:
-      if self._is_owned():
+      if self.is_owned():
         self.__owners[threading.currentThread()].add(name)
       else:
         self.__owners[threading.currentThread()] = set([name])
@@ -944,29 +1112,29 @@ class LockSet:
   def _del_owned(self, name=None):
     """Note the current thread owns the given lock"""
 
-    assert not (name is None and self.__lock._is_owned()), \
+    assert not (name is None and self.__lock.is_owned()), \
            "Cannot hold internal lock when deleting owner status"
 
     if name is not None:
       self.__owners[threading.currentThread()].remove(name)
 
     # Only remove the key if we don't hold the set-lock as well
-    if (not self.__lock._is_owned() and
-        not self.__owners[threading.currentThread()]):
+    if not (self.__lock.is_owned() or
+            self.__owners[threading.currentThread()]):
       del self.__owners[threading.currentThread()]
 
-  def _list_owned(self):
+  def list_owned(self):
     """Get the set of resource names owned by the current thread"""
-    if self._is_owned():
+    if self.is_owned():
       return self.__owners[threading.currentThread()].copy()
     else:
       return set()
 
   def _release_and_delete_owned(self):
     """Release and delete all resources owned by the current thread"""
-    for lname in self._list_owned():
+    for lname in self.list_owned():
       lock = self.__lockdict[lname]
-      if lock._is_owned():
+      if lock.is_owned():
         lock.release()
       self._del_owned(name=lname)
 
@@ -988,7 +1156,7 @@ class LockSet:
     # If we don't already own the set-level lock acquired
     # we'll get it and note we need to release it later.
     release_lock = False
-    if not self.__lock._is_owned():
+    if not self.__lock.is_owned():
       release_lock = True
       self.__lock.acquire(shared=1)
     try:
@@ -999,9 +1167,12 @@ class LockSet:
     return set(result)
 
   def acquire(self, names, timeout=None, shared=0, priority=None,
-              test_notify=None):
+              opportunistic=False, test_notify=None):
     """Acquire a set of resource locks.
 
+    @note: When acquiring locks opportunistically, any number of locks might
+      actually be acquired, even zero.
+
     @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
@@ -1009,9 +1180,16 @@ class LockSet:
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float or None
-    @param timeout: Maximum time to acquire all locks
+    @param timeout: Maximum time to acquire all locks; for opportunistic
+      acquisitions, a timeout can only be given when C{names} is C{None}, in
+      which case it is exclusively used for acquiring the L{LockSet}-internal
+      lock; opportunistic acquisitions don't use a timeout for acquiring
+      individual locks
     @type priority: integer
     @param priority: Priority for acquiring locks
+    @type opportunistic: boolean
+    @param opportunistic: Acquire locks opportunistically; use the return value
+      to determine which locks were actually acquired
     @type test_notify: callable or None
     @param test_notify: Special callback function for unittesting
 
@@ -1025,26 +1203,32 @@ class LockSet:
     assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
-    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
-                                  " (lockset %s)" % self.name)
+    assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
+                                 " (lockset %s)" % self.name)
 
     if priority is None:
       priority = _DEFAULT_PRIORITY
 
-    # We need to keep track of how long we spent waiting for a lock. The
-    # timeout passed to this function is over all lock acquires.
-    running_timeout = RunningTimeout(timeout, False)
-
     try:
       if names is not None:
+        assert timeout is None or not opportunistic, \
+          ("Opportunistic acquisitions can only use a timeout if no"
+           " names are given; see docstring for details")
+
         # Support passing in a single resource to acquire rather than many
         if isinstance(names, basestring):
           names = [names]
 
-        return self.__acquire_inner(names, False, shared, priority,
-                                    running_timeout.Remaining, test_notify)
+        (mode, _, timeout_fn) = \
+          _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
+
+        return self.__acquire_inner(names, mode, shared, priority,
+                                    timeout_fn, test_notify)
 
       else:
+        (mode, ls_timeout_fn, timeout_fn) = \
+          _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
+
         # If no names are given acquire the whole set by not letting new names
         # being added before we release, and getting the current list of names.
         # Some of them may then be deleted later, but we'll cope with this.
@@ -1055,14 +1239,15 @@ class LockSet:
         # anyway, though, so we'll get the list lock exclusively as well in
         # order to be able to do add() on the set while owning it.
         if not self.__lock.acquire(shared=shared, priority=priority,
-                                   timeout=running_timeout.Remaining()):
+                                   timeout=ls_timeout_fn()):
           raise _AcquireTimeout()
+
         try:
           # note we own the set-lock
           self._add_owned()
 
-          return self.__acquire_inner(self.__names(), True, shared, priority,
-                                      running_timeout.Remaining, test_notify)
+          return self.__acquire_inner(self.__names(), mode, shared,
+                                      priority, timeout_fn, test_notify)
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
@@ -1074,37 +1259,50 @@ class LockSet:
     except _AcquireTimeout:
       return None
 
-  def __acquire_inner(self, names, want_all, shared, priority,
+  def __acquire_inner(self, names, mode, shared, priority,
                       timeout_fn, test_notify):
     """Inner logic for acquiring a number of locks.
 
+    Acquisition modes:
+
+      - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
+        deleted locks can be ignored as the whole set is being acquired with
+        its internal lock held
+      - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
+        timeouts and deleted locks are fatal
+      - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
+        all within the set) which should be acquired opportunistically, that is
+        failures are ignored
+
     @param names: Names of the locks to be acquired
-    @param want_all: Whether all locks in the set should be acquired
+    @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
     @param shared: Whether to acquire in shared mode
-    @param timeout_fn: Function returning remaining timeout
+    @param timeout_fn: Function returning remaining timeout (C{None} for
+      opportunistic acquisitions)
     @param priority: Priority for acquiring locks
     @param test_notify: Special callback function for unittesting
 
     """
+    assert mode in _LS_ACQUIRE_MODES
+
     acquire_list = []
 
     # First we look the locks up on __lockdict. We have no way of being sure
     # they will still be there after, but this makes it a lot faster should
     # just one of them be the already wrong. Using a sorted sequence to prevent
     # deadlocks.
-    for lname in sorted(utils.UniqueSequence(names)):
+    for lname in sorted(frozenset(names)):
       try:
         lock = self.__lockdict[lname] # raises KeyError if lock is not there
       except KeyError:
-        if want_all:
-          # We are acquiring all the set, it doesn't matter if this particular
-          # element is not there anymore.
-          continue
-
-        raise errors.LockError("Non-existing lock %s in set %s" %
-                               (lname, self.name))
-
-      acquire_list.append((lname, lock))
+        # We are acquiring the whole set, it doesn't matter if this particular
+        # element is not there anymore. If, however, only certain names should
+        # be acquired, not finding a lock is an error.
+        if mode == _LS_ACQUIRE_EXACT:
+          raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
+                                 " been removed)" % (lname, self.name))
+      else:
+        acquire_list.append((lname, lock))
 
     # This will hold the locknames we effectively acquired.
     acquired = set()
@@ -1129,16 +1327,20 @@ class LockSet:
                                      priority=priority,
                                      test_notify=test_notify_fn)
         except errors.LockError:
-          if want_all:
-            # We are acquiring all the set, it doesn't matter if this
+          if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
+            # We are acquiring the whole set, it doesn't matter if this
             # particular element is not there anymore.
             continue
 
-          raise errors.LockError("Non-existing lock %s in set %s" %
-                                 (lname, self.name))
+          raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
+                                 " been removed)" % (lname, self.name))
 
         if not acq_success:
           # Couldn't get lock or timeout occurred
+          if mode == _LS_ACQUIRE_OPPORTUNISTIC:
+            # Ignore timeouts on opportunistic acquisitions
+            continue
+
           if timeout is None:
             # This shouldn't happen as SharedLock.acquire(timeout=None) is
             # blocking.
@@ -1156,7 +1358,7 @@ class LockSet:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
           # Of course something is going to be really wrong after this.
-          if lock._is_owned():
+          if lock.is_owned():
             lock.release()
           raise
 
@@ -1167,6 +1369,42 @@ class LockSet:
 
     return acquired
 
+  def downgrade(self, names=None):
+    """Downgrade a set of resource locks from exclusive to shared mode.
+
+    The locks must have been acquired in exclusive mode.
+
+    """
+    assert self.is_owned(), ("downgrade on lockset %s while not owning any"
+                             " lock" % self.name)
+
+    # Support passing in a single resource to downgrade rather than many
+    if isinstance(names, basestring):
+      names = [names]
+
+    owned = self.list_owned()
+
+    if names is None:
+      names = owned
+    else:
+      names = set(names)
+      assert owned.issuperset(names), \
+        ("downgrade() on unheld resources %s (set %s)" %
+         (names.difference(owned), self.name))
+
+    for lockname in names:
+      self.__lockdict[lockname].downgrade()
+
+    # Do we own the lockset in exclusive mode?
+    if self.__lock.is_owned(shared=0):
+      # Have all locks been downgraded?
+      if not compat.any(lock.is_owned(shared=0)
+                        for lock in self.__lockdict.values()):
+        self.__lock.downgrade()
+        assert self.__lock.is_owned(shared=1)
+
+    return True
+
   def release(self, names=None):
     """Release a set of resource locks, at the same level.
 
@@ -1178,24 +1416,24 @@ class LockSet:
         (defaults to all the locks acquired at that level).
 
     """
-    assert self._is_owned(), ("release() on lock set %s while not owner" %
-                              self.name)
+    assert self.is_owned(), ("release() on lock set %s while not owner" %
+                             self.name)
 
     # Support passing in a single resource to release rather than many
     if isinstance(names, basestring):
       names = [names]
 
     if names is None:
-      names = self._list_owned()
+      names = self.list_owned()
     else:
       names = set(names)
-      assert self._list_owned().issuperset(names), (
+      assert self.list_owned().issuperset(names), (
                "release() on unheld resources %s (set %s)" %
-               (names.difference(self._list_owned()), self.name))
+               (names.difference(self.list_owned()), self.name))
 
     # First of all let's release the "all elements" lock, if set.
     # After this 'add' can work again
-    if self.__lock._is_owned():
+    if self.__lock.is_owned():
       self.__lock.release()
       self._del_owned()
 
@@ -1217,7 +1455,7 @@ class LockSet:
 
     """
     # Check we don't already own locks at this level
-    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
+    assert not self.is_owned() or self.__lock.is_owned(shared=0), \
       ("Cannot add locks if the set %s is only partially owned, or shared" %
        self.name)
 
@@ -1228,7 +1466,7 @@ class LockSet:
     # If we don't already own the set-level lock acquired in an exclusive way
     # we'll get it and note we need to release it later.
     release_lock = False
-    if not self.__lock._is_owned():
+    if not self.__lock.is_owned():
       release_lock = True
       self.__lock.acquire()
 
@@ -1291,7 +1529,7 @@ class LockSet:
     # If we own any subset of this lock it must be a superset of what we want
     # to delete. The ownership must also be exclusive, but that will be checked
     # by the lock itself.
-    assert not self._is_owned() or self._list_owned().issuperset(names), (
+    assert not self.is_owned() or self.list_owned().issuperset(names), (
       "remove() on acquired lockset %s while not owning all elements" %
       self.name)
 
@@ -1308,8 +1546,8 @@ class LockSet:
         removed.append(lname)
       except (KeyError, errors.LockError):
         # This cannot happen if we were already holding it, verify:
-        assert not self._is_owned(), ("remove failed while holding lockset %s"
-                                      % self.name)
+        assert not self.is_owned(), ("remove failed while holding lockset %s" %
+                                     self.name)
       else:
         # If no LockError was raised we are the ones who deleted the lock.
         # This means we can safely remove it from lockdict, as any further or
@@ -1320,40 +1558,71 @@ class LockSet:
         # it's the job of the one who actually deleted it.
         del self.__lockdict[lname]
         # And let's remove it from our private list if we owned it.
-        if self._is_owned():
+        if self.is_owned():
           self._del_owned(name=lname)
 
     return removed
 
 
-# Locking levels, must be acquired in increasing order.
-# Current rules are:
-#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
-#   acquired before performing any operation, either in shared or in exclusive
-#   mode. acquiring the BGL in exclusive mode is discouraged and should be
-#   avoided.
-#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
-#   If you need more than one node, or more than one instance, acquire them at
-#   the same time.
-LEVEL_CLUSTER = 0
-LEVEL_INSTANCE = 1
-LEVEL_NODE = 2
-
-LEVELS = [LEVEL_CLUSTER,
-          LEVEL_INSTANCE,
-          LEVEL_NODE]
+# Locking levels, must be acquired in increasing order. Current rules are:
+# - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
+#   acquired before performing any operation, either in shared or exclusive
+#   mode. Acquiring the BGL in exclusive mode is discouraged and should be
+#   avoided..
+# - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If
+#   you need more than one node, or more than one instance, acquire them at the
+#   same time.
+# - LEVEL_NODE_RES is for node resources and should be used by operations with
+#   possibly high impact on the node's disks.
+# - LEVEL_NODE_ALLOC blocks instance allocations for the whole cluster
+#   ("NAL" is the only lock at this level). It should be acquired in shared
+#   mode when an opcode blocks all or a significant amount of a cluster's
+#   locks. Opcodes doing instance allocations should acquire in exclusive mode.
+#   Once the set of acquired locks for an opcode has been reduced to the working
+#   set, the NAL should be released as well to allow allocations to proceed.
+(LEVEL_CLUSTER,
+ LEVEL_INSTANCE,
+ LEVEL_NODE_ALLOC,
+ LEVEL_NODEGROUP,
+ LEVEL_NODE,
+ LEVEL_NODE_RES,
+ LEVEL_NETWORK) = range(0, 7)
+
+LEVELS = [
+  LEVEL_CLUSTER,
+  LEVEL_INSTANCE,
+  LEVEL_NODE_ALLOC,
+  LEVEL_NODEGROUP,
+  LEVEL_NODE,
+  LEVEL_NODE_RES,
+  LEVEL_NETWORK,
+  ]
 
 # Lock levels which are modifiable
-LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
-
+LEVELS_MOD = compat.UniqueFrozenset([
+  LEVEL_NODE_RES,
+  LEVEL_NODE,
+  LEVEL_NODEGROUP,
+  LEVEL_INSTANCE,
+  LEVEL_NETWORK,
+  ])
+
+#: Lock level names (make sure to use singular form)
 LEVEL_NAMES = {
   LEVEL_CLUSTER: "cluster",
   LEVEL_INSTANCE: "instance",
+  LEVEL_NODE_ALLOC: "node-alloc",
+  LEVEL_NODEGROUP: "nodegroup",
   LEVEL_NODE: "node",
+  LEVEL_NODE_RES: "node-res",
+  LEVEL_NETWORK: "network",
   }
 
 # Constant for the big ganeti lock
-BGL = 'BGL'
+BGL = "BGL"
+
+#: Node allocation lock
+NAL = "NAL"
 
 
 class GanetiLockManager:
@@ -1367,14 +1636,15 @@ class GanetiLockManager:
   """
   _instance = None
 
-  def __init__(self, nodes=None, instances=None):
+  def __init__(self, node_uuids, nodegroups, instance_names, networks):
     """Constructs a new GanetiLockManager object.
 
     There should be only a GanetiLockManager object at any time, so this
     function raises an error if this is not the case.
 
-    @param nodes: list of node names
-    @param instances: list of instance names
+    @param node_uuids: list of node UUIDs
+    @param nodegroups: list of nodegroup uuids
+    @param instance_names: list of instance names
 
     """
     assert self.__class__._instance is None, \
@@ -1387,19 +1657,35 @@ class GanetiLockManager:
     # The keyring contains all the locks, at their level and in the correct
     # locking order.
     self.__keyring = {
-      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
-      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
-      LEVEL_INSTANCE: LockSet(instances, "instances",
+      LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
+      LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor),
+      LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor),
+      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
+      LEVEL_INSTANCE: LockSet(instance_names, "instance",
                               monitor=self._monitor),
+      LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor),
+      LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor),
       }
 
-  def QueryLocks(self, fields, sync):
+    assert compat.all(ls.name == LEVEL_NAMES[level]
+                      for (level, ls) in self.__keyring.items()), \
+      "Keyring name mismatch"
+
+  def AddToLockMonitor(self, provider):
+    """Registers a new lock with the monitor.
+
+    See L{LockMonitor.RegisterLock}.
+
+    """
+    return self._monitor.RegisterLock(provider)
+
+  def QueryLocks(self, fields):
     """Queries information from all locks.
 
     See L{LockMonitor.QueryLocks}.
 
     """
-    return self._monitor.QueryLocks(fields, sync)
+    return self._monitor.QueryLocks(fields)
 
   def _names(self, level):
     """List the lock names at the given level.
@@ -1412,19 +1698,33 @@ class GanetiLockManager:
     assert level in LEVELS, "Invalid locking level %s" % level
     return self.__keyring[level]._names()
 
-  def _is_owned(self, level):
+  def is_owned(self, level):
     """Check whether we are owning locks at the given level
 
     """
-    return self.__keyring[level]._is_owned()
-
-  is_owned = _is_owned
+    return self.__keyring[level].is_owned()
 
-  def _list_owned(self, level):
+  def list_owned(self, level):
     """Get the set of owned locks at the given level
 
     """
-    return self.__keyring[level]._list_owned()
+    return self.__keyring[level].list_owned()
+
+  def check_owned(self, level, names, shared=-1):
+    """Check if locks at a certain level are owned in a specific mode.
+
+    @see: L{LockSet.check_owned}
+
+    """
+    return self.__keyring[level].check_owned(names, shared=shared)
+
+  def owning_all(self, level):
+    """Checks whether current thread owns all locks at a certain level.
+
+    @see: L{LockSet.owning_all}
+
+    """
+    return self.__keyring[level].owning_all()
 
   def _upper_owned(self, level):
     """Check that we don't own any lock at a level greater than the given one.
@@ -1432,18 +1732,18 @@ class GanetiLockManager:
     """
     # This way of checking only works if LEVELS[i] = i, which we check for in
     # the test cases.
-    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
+    return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
 
-  def _BGL_owned(self): # pylint: disable-msg=C0103
+  def _BGL_owned(self): # pylint: disable=C0103
     """Check if the current thread owns the BGL.
 
     Both an exclusive or a shared acquisition work.
 
     """
-    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
+    return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
 
   @staticmethod
-  def _contains_BGL(level, names): # pylint: disable-msg=C0103
+  def _contains_BGL(level, names): # pylint: disable=C0103
     """Check if the level contains the BGL.
 
     Check if acting on the given level and set of names will change
@@ -1452,7 +1752,8 @@ class GanetiLockManager:
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
-  def acquire(self, level, names, timeout=None, shared=0):
+  def acquire(self, level, names, timeout=None, shared=0, priority=None,
+              opportunistic=False):
     """Acquire a set of resource locks, at the same level.
 
     @type level: member of locking.LEVELS
@@ -1465,6 +1766,11 @@ class GanetiLockManager:
         an exclusive lock will be acquired
     @type timeout: float
     @param timeout: Maximum time to acquire all locks
+    @type priority: integer
+    @param priority: Priority for acquiring lock
+    @type opportunistic: boolean
+    @param opportunistic: Acquire locks opportunistically; use the return value
+      to determine which locks were actually acquired
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -1476,14 +1782,32 @@ class GanetiLockManager:
     # point in acquiring any other lock, unless perhaps we are half way through
     # the migration of the current opcode.
     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
-            "You must own the Big Ganeti Lock before acquiring any other")
+      "You must own the Big Ganeti Lock before acquiring any other")
 
     # Check we don't own locks at the same or upper levels.
     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
-           " while owning some at a greater one")
+                                          " while owning some at a greater one")
 
     # Acquire the locks in the set.
-    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
+    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
+                                         priority=priority,
+                                         opportunistic=opportunistic)
+
+  def downgrade(self, level, names=None):
+    """Downgrade a set of resource locks from exclusive to shared mode.
+
+    You must have acquired the locks in exclusive mode.
+
+    @type level: member of locking.LEVELS
+    @param level: the level at which the locks shall be downgraded
+    @type names: list of strings, or None
+    @param names: the names of the locks which shall be downgraded
+        (defaults to all the locks acquired at the level)
+
+    """
+    assert level in LEVELS, "Invalid locking level %s" % level
+
+    return self.__keyring[level].downgrade(names=names)
 
   def release(self, level, names=None):
     """Release a set of resource locks, at the same level.
@@ -1501,10 +1825,10 @@ class GanetiLockManager:
     assert level in LEVELS, "Invalid locking level %s" % level
     assert (not self._contains_BGL(level, names) or
             not self._upper_owned(LEVEL_CLUSTER)), (
-            "Cannot release the Big Ganeti Lock while holding something"
-            " at upper levels (%r)" %
-            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
-                              for i in self.__keyring.keys()]), ))
+              "Cannot release the Big Ganeti Lock while holding something"
+              " at upper levels (%r)" %
+              (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
+                                for i in self.__keyring.keys()]), ))
 
     # Release will complain if we don't own the locks already
     return self.__keyring[level].release(names)
@@ -1524,9 +1848,9 @@ class GanetiLockManager:
     """
     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
     assert self._BGL_owned(), ("You must own the BGL before performing other"
-           " operations")
+                               " operations")
     assert not self._upper_owned(level), ("Cannot add locks at a level"
-           " while owning some at a greater one")
+                                          " while owning some at a greater one")
     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
 
   def remove(self, level, names):
@@ -1544,16 +1868,29 @@ class GanetiLockManager:
     """
     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
     assert self._BGL_owned(), ("You must own the BGL before performing other"
-           " operations")
+                               " operations")
     # Check we either own the level or don't own anything from here
     # up. LockSet.remove() will check the case in which we don't own
     # all the needed resources, or we have a shared ownership.
-    assert self._is_owned(level) or not self._upper_owned(level), (
+    assert self.is_owned(level) or not self._upper_owned(level), (
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
     return self.__keyring[level].remove(names)
 
 
+def _MonitorSortKey((item, idx, num)):
+  """Sorting key function.
+
+  Sort by name, registration order and then order of information. This provides
+  a stable sort order over different providers, even if they return the same
+  name.
+
+  """
+  (name, _, _, _) = item
+
+  return (utils.NiceSortKey(name), num, idx)
+
+
 class LockMonitor(object):
   _LOCK_ATTR = "_lock"
 
@@ -1563,48 +1900,76 @@ class LockMonitor(object):
     """
     self._lock = SharedLock("LockMonitor")
 
+    # Counter for stable sorting
+    self._counter = itertools.count(0)
+
     # Tracked locks. Weak references are used to avoid issues with circular
     # references and deletion.
     self._locks = weakref.WeakKeyDictionary()
 
   @ssynchronized(_LOCK_ATTR)
-  def RegisterLock(self, lock):
+  def RegisterLock(self, provider):
     """Registers a new lock.
 
+    @param provider: Object with a callable method named C{GetLockInfo}, taking
+      a single C{set} containing the requested information items
+    @note: It would be nicer to only receive the function generating the
+      requested information but, as it turns out, weak references to bound
+      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
+      workarounds, but none of the ones I found works properly in combination
+      with a standard C{WeakKeyDictionary}
+
     """
-    logging.debug("Registering lock %s", lock.name)
-    assert lock not in self._locks, "Duplicate lock registration"
-    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
-           "Found duplicate lock name"
-    self._locks[lock] = None
+    assert provider not in self._locks, "Duplicate registration"
 
-  @ssynchronized(_LOCK_ATTR)
-  def _GetLockInfo(self, fields):
-    """Get information from all locks while the monitor lock is held.
+    # There used to be a check for duplicate names here. As it turned out, when
+    # a lock is re-created with the same name in a very short timeframe, the
+    # previous instance might not yet be removed from the weakref dictionary.
+    # By keeping track of the order of incoming registrations, a stable sort
+    # ordering can still be guaranteed.
 
-    """
-    result = {}
+    self._locks[provider] = self._counter.next()
 
-    for lock in self._locks.keys():
-      assert lock.name not in result, "Found duplicate lock name"
-      result[lock.name] = lock.GetInfo(fields)
+  def _GetLockInfo(self, requested):
+    """Get information from all locks.
 
-    return result
+    """
+    # Must hold lock while getting consistent list of tracked items
+    self._lock.acquire(shared=1)
+    try:
+      items = self._locks.items()
+    finally:
+      self._lock.release()
 
-  def QueryLocks(self, fields, sync):
+    return [(info, idx, num)
+            for (provider, num) in items
+            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
+
+  def _Query(self, fields):
     """Queries information from all locks.
 
     @type fields: list of strings
     @param fields: List of fields to return
-    @type sync: boolean
-    @param sync: Whether to operate in synchronous mode
 
     """
-    if sync:
-      raise NotImplementedError("Synchronous queries are not implemented")
+    qobj = query.Query(query.LOCK_FIELDS, fields)
+
+    # Get all data with internal lock held and then sort by name and incoming
+    # order
+    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
+                      key=_MonitorSortKey)
+
+    # Extract lock information and build query data
+    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
 
-    # Get all data without sorting
-    result = self._GetLockInfo(fields)
+  def QueryLocks(self, fields):
+    """Queries information from all locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    (qobj, ctx) = self._Query(fields)
 
-    # Sort by name
-    return [result[name] for name in utils.NiceSort(result.keys())]
+    # Prepare query response
+    return query.GetQueryResponse(qobj, ctx)