utils.log: Split formatter building into separate function
[ganeti-local] / lib / locking.py
index 78fdd93..3575085 100644 (file)
 import os
 import select
 import threading
-import time
 import errno
 import weakref
 import logging
 import heapq
+import operator
 
 from ganeti import errors
 from ganeti import utils
 from ganeti import compat
+from ganeti import query
 
 
 _EXCLUSIVE_TEXT = "exclusive"
 _SHARED_TEXT = "shared"
+_DELETED_TEXT = "deleted"
 
 _DEFAULT_PRIORITY = 0
 
@@ -73,59 +75,6 @@ def ssynchronized(mylock, shared=0):
   return wrap
 
 
-class RunningTimeout(object):
-  """Class to calculate remaining timeout when doing several operations.
-
-  """
-  __slots__ = [
-    "_allow_negative",
-    "_start_time",
-    "_time_fn",
-    "_timeout",
-    ]
-
-  def __init__(self, timeout, allow_negative, _time_fn=time.time):
-    """Initializes this class.
-
-    @type timeout: float
-    @param timeout: Timeout duration
-    @type allow_negative: bool
-    @param allow_negative: Whether to return values below zero
-    @param _time_fn: Time function for unittests
-
-    """
-    object.__init__(self)
-
-    if timeout is not None and timeout < 0.0:
-      raise ValueError("Timeout must not be negative")
-
-    self._timeout = timeout
-    self._allow_negative = allow_negative
-    self._time_fn = _time_fn
-
-    self._start_time = None
-
-  def Remaining(self):
-    """Returns the remaining timeout.
-
-    """
-    if self._timeout is None:
-      return None
-
-    # Get start time on first calculation
-    if self._start_time is None:
-      self._start_time = self._time_fn()
-
-    # Calculate remaining time
-    remaining_timeout = self._start_time + self._timeout - self._time_fn()
-
-    if not self._allow_negative:
-      # Ensure timeout is always >= 0
-      return max(0.0, remaining_timeout)
-
-    return remaining_timeout
-
-
 class _SingleNotifyPipeConditionWaiter(object):
   """Helper class for SingleNotifyPipeCondition
 
@@ -155,7 +104,7 @@ class _SingleNotifyPipeConditionWaiter(object):
     @param timeout: Timeout for waiting (can be None)
 
     """
-    running_timeout = RunningTimeout(timeout, True)
+    running_timeout = utils.RunningTimeout(timeout, True)
 
     while True:
       remaining_time = running_timeout.Remaining()
@@ -486,65 +435,60 @@ class SharedLock(object):
     if monitor:
       monitor.RegisterLock(self)
 
-  def GetInfo(self, fields):
+  def GetInfo(self, requested):
     """Retrieves information for querying locks.
 
-    @type fields: list of strings
-    @param fields: List of fields to return
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
 
     """
     self.__lock.acquire()
     try:
-      info = []
-
       # Note: to avoid unintentional race conditions, no references to
       # modifiable objects should be returned unless they were created in this
       # function.
-      for fname in fields:
-        if fname == "name":
-          info.append(self.name)
-        elif fname == "mode":
-          if self.__deleted:
-            info.append("deleted")
-            assert not (self.__exc or self.__shr)
-          elif self.__exc:
-            info.append(_EXCLUSIVE_TEXT)
-          elif self.__shr:
-            info.append(_SHARED_TEXT)
-          else:
-            info.append(None)
-        elif fname == "owner":
-          if self.__exc:
-            owner = [self.__exc]
-          else:
-            owner = self.__shr
-
-          if owner:
-            assert not self.__deleted
-            info.append([i.getName() for i in owner])
-          else:
-            info.append(None)
-        elif fname == "pending":
-          data = []
-
-          # Sorting instead of copying and using heaq functions for simplicity
-          for (_, prioqueue) in sorted(self.__pending):
-            for cond in prioqueue:
-              if cond.shared:
-                mode = _SHARED_TEXT
-              else:
-                mode = _EXCLUSIVE_TEXT
-
-              # This function should be fast as it runs with the lock held.
-              # Hence not using utils.NiceSort.
-              data.append((mode, sorted(i.getName()
-                                        for i in cond.get_waiting())))
-
-          info.append(data)
+      mode = None
+      owner_names = None
+
+      if query.LQ_MODE in requested:
+        if self.__deleted:
+          mode = _DELETED_TEXT
+          assert not (self.__exc or self.__shr)
+        elif self.__exc:
+          mode = _EXCLUSIVE_TEXT
+        elif self.__shr:
+          mode = _SHARED_TEXT
+
+      # Current owner(s) are wanted
+      if query.LQ_OWNER in requested:
+        if self.__exc:
+          owner = [self.__exc]
         else:
-          raise errors.OpExecError("Invalid query field '%s'" % fname)
+          owner = self.__shr
+
+        if owner:
+          assert not self.__deleted
+          owner_names = [i.getName() for i in owner]
+
+      # Pending acquires are wanted
+      if query.LQ_PENDING in requested:
+        pending = []
+
+        # Sorting instead of copying and using heaq functions for simplicity
+        for (_, prioqueue) in sorted(self.__pending):
+          for cond in prioqueue:
+            if cond.shared:
+              pendmode = _SHARED_TEXT
+            else:
+              pendmode = _EXCLUSIVE_TEXT
+
+            # List of names will be sorted in L{query._GetLockPending}
+            pending.append((pendmode, [i.getName()
+                                       for i in cond.get_waiting()]))
+      else:
+        pending = None
 
-      return info
+      return (self.name, mode, owner_names, pending)
     finally:
       self.__lock.release()
 
@@ -1033,7 +977,7 @@ class LockSet:
 
     # We need to keep track of how long we spent waiting for a lock. The
     # timeout passed to this function is over all lock acquires.
-    running_timeout = RunningTimeout(timeout, False)
+    running_timeout = utils.RunningTimeout(timeout, False)
 
     try:
       if names is not None:
@@ -1337,18 +1281,21 @@ class LockSet:
 #   the same time.
 LEVEL_CLUSTER = 0
 LEVEL_INSTANCE = 1
-LEVEL_NODE = 2
+LEVEL_NODEGROUP = 2
+LEVEL_NODE = 3
 
 LEVELS = [LEVEL_CLUSTER,
           LEVEL_INSTANCE,
+          LEVEL_NODEGROUP,
           LEVEL_NODE]
 
 # Lock levels which are modifiable
-LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
+LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
 
 LEVEL_NAMES = {
   LEVEL_CLUSTER: "cluster",
   LEVEL_INSTANCE: "instance",
+  LEVEL_NODEGROUP: "nodegroup",
   LEVEL_NODE: "node",
   }
 
@@ -1367,13 +1314,14 @@ class GanetiLockManager:
   """
   _instance = None
 
-  def __init__(self, nodes, instances):
+  def __init__(self, nodes, nodegroups, instances):
     """Constructs a new GanetiLockManager object.
 
     There should be only a GanetiLockManager object at any time, so this
     function raises an error if this is not the case.
 
     @param nodes: list of node names
+    @param nodegroups: list of nodegroup uuids
     @param instances: list of instance names
 
     """
@@ -1389,17 +1337,26 @@ class GanetiLockManager:
     self.__keyring = {
       LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
       LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
       LEVEL_INSTANCE: LockSet(instances, "instances",
                               monitor=self._monitor),
       }
 
-  def QueryLocks(self, fields, sync):
+  def QueryLocks(self, fields):
     """Queries information from all locks.
 
     See L{LockMonitor.QueryLocks}.
 
     """
-    return self._monitor.QueryLocks(fields, sync)
+    return self._monitor.QueryLocks(fields)
+
+  def OldStyleQueryLocks(self, fields):
+    """Queries information from all locks, returning old-style data.
+
+    See L{LockMonitor.OldStyleQueryLocks}.
+
+    """
+    return self._monitor.OldStyleQueryLocks(fields)
 
   def _names(self, level):
     """List the lock names at the given level.
@@ -1582,32 +1539,46 @@ class LockMonitor(object):
     self._locks[lock] = None
 
   @ssynchronized(_LOCK_ATTR)
-  def _GetLockInfo(self, fields):
+  def _GetLockInfo(self, requested):
     """Get information from all locks while the monitor lock is held.
 
     """
-    result = {}
+    return [lock.GetInfo(requested) for lock in self._locks.keys()]
+
+  def _Query(self, fields):
+    """Queries information from all locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    qobj = query.Query(query.LOCK_FIELDS, fields)
 
-    for lock in self._locks.keys():
-      assert lock.name not in result, "Found duplicate lock name"
-      result[lock.name] = lock.GetInfo(fields)
+    # Get all data and sort by name
+    lockinfo = utils.NiceSort(self._GetLockInfo(qobj.RequestedData()),
+                              key=operator.itemgetter(0))
 
-    return result
+    return (qobj, query.LockQueryData(lockinfo))
 
-  def QueryLocks(self, fields, sync):
+  def QueryLocks(self, fields):
     """Queries information from all locks.
 
     @type fields: list of strings
     @param fields: List of fields to return
-    @type sync: boolean
-    @param sync: Whether to operate in synchronous mode
 
     """
-    if sync:
-      raise NotImplementedError("Synchronous queries are not implemented")
+    (qobj, ctx) = self._Query(fields)
 
-    # Get all data without sorting
-    result = self._GetLockInfo(fields)
+    # Prepare query response
+    return query.GetQueryResponse(qobj, ctx)
+
+  def OldStyleQueryLocks(self, fields):
+    """Queries information from all locks, returning old-style data.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    (qobj, ctx) = self._Query(fields)
 
-    # Sort by name
-    return [result[name] for name in utils.NiceSort(result.keys())]
+    return qobj.OldStyleQuery(ctx)