opcodes: document OP_DSC_FIELD in OpCode and OpCode.Summary()
[ganeti-local] / lib / mcpu.py
index 7248f93..a724eb9 100644 (file)
@@ -38,10 +38,11 @@ from ganeti import errors
 from ganeti import rpc
 from ganeti import cmdlib
 from ganeti import locking
+from ganeti import utils
 
 
-class _LockAcquireTimeout(Exception):
-  """Internal exception to report timeouts on acquiring locks.
+class LockAcquireTimeout(Exception):
+  """Exception to report timeouts on acquiring locks.
 
   """
 
@@ -71,60 +72,40 @@ def _CalculateLockAttemptTimeouts():
   return result
 
 
-class _LockAttemptTimeoutStrategy(object):
+class LockAttemptTimeoutStrategy(object):
   """Class with lock acquire timeout strategy.
 
   """
   __slots__ = [
-    "_attempt",
+    "_timeouts",
     "_random_fn",
-    "_start_time",
     "_time_fn",
-    "_running_timeout",
     ]
 
   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
 
-  def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
+  def __init__(self, _time_fn=time.time, _random_fn=random.random):
     """Initializes this class.
 
-    @type attempt: int
-    @param attempt: Current attempt number
     @param _time_fn: Time function for unittests
     @param _random_fn: Random number generator for unittests
 
     """
     object.__init__(self)
 
-    if attempt < 0:
-      raise ValueError("Attempt must be zero or positive")
-
-    self._attempt = attempt
+    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
     self._time_fn = _time_fn
     self._random_fn = _random_fn
 
-    try:
-      timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
-    except IndexError:
-      # No more timeouts, do blocking acquire
-      timeout = None
-
-    self._running_timeout = locking.RunningTimeout(timeout, False,
-                                                   _time_fn=_time_fn)
-
   def NextAttempt(self):
-    """Returns the strategy for the next attempt.
+    """Returns the timeout for the next attempt.
 
     """
-    return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
-                                       _time_fn=self._time_fn,
-                                       _random_fn=self._random_fn)
-
-  def CalcRemainingTimeout(self):
-    """Returns the remaining timeout.
-
-    """
-    timeout = self._running_timeout.Remaining()
+    try:
+      timeout = self._timeouts.next()
+    except StopIteration:
+      # No more timeouts, do blocking acquire
+      timeout = None
 
     if timeout is not None:
       # Add a small variation (-/+ 5%) to timeout. This helps in situations
@@ -173,6 +154,8 @@ class Processor(object):
     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
+    opcodes.OpQuery: cmdlib.LUQuery,
+    opcodes.OpQueryFields: cmdlib.LUQueryFields,
     # node lu
     opcodes.OpAddNode: cmdlib.LUAddNode,
     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
@@ -205,6 +188,8 @@ class Processor(object):
     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
+    # node group lu
+    opcodes.OpQueryGroups: cmdlib.LUQueryGroups,
     # os lu
     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
     # exports lu
@@ -238,7 +223,7 @@ class Processor(object):
     self.rpc = rpc.RpcRunner(context.cfg)
     self.hmclass = HooksMaster
 
-  def _AcquireLocks(self, level, names, shared, timeout):
+  def _AcquireLocks(self, level, names, shared, timeout, priority):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -249,13 +234,18 @@ class Processor(object):
     @param shared: Whether the locks should be acquired in shared mode
     @type timeout: None or float
     @param timeout: Timeout for acquiring the locks
+    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+        amount of time
 
     """
     if self._cbs:
       self._cbs.CheckCancel()
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
-                                        timeout=timeout)
+                                        timeout=timeout, priority=priority)
+
+    if acquired is None:
+      raise LockAcquireTimeout()
 
     return acquired
 
@@ -290,7 +280,7 @@ class Processor(object):
 
     return result
 
-  def _LockAndExecLU(self, lu, level, calc_timeout):
+  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -325,11 +315,7 @@ class Processor(object):
           needed_locks = lu.needed_locks[level]
 
           acquired = self._AcquireLocks(level, needed_locks, share,
-                                        calc_timeout())
-
-          if acquired is None:
-            raise _LockAcquireTimeout()
-
+                                        calc_timeout(), priority)
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
@@ -348,7 +334,7 @@ class Processor(object):
         try:
           lu.acquired_locks[level] = acquired
 
-          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
@@ -357,63 +343,59 @@ class Processor(object):
           self.context.glm.release(level)
 
     else:
-      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
 
     return result
 
-  def ExecOpCode(self, op, cbs):
+  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
     """Execute an opcode.
 
     @type op: an OpCode instance
     @param op: the opcode to be executed
     @type cbs: L{OpExecCbBase}
     @param cbs: Runtime callbacks
+    @type timeout: float or None
+    @param timeout: Maximum time to acquire all locks, None for no timeout
+    @type priority: number or None
+    @param priority: Priority for acquiring lock(s)
+    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+        amount of time
 
     """
     if not isinstance(op, opcodes.OpCode):
       raise errors.ProgrammerError("Non-opcode instance passed"
                                    " to ExecOpcode")
 
+    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
+    if lu_class is None:
+      raise errors.OpCodeUnknown("Unknown opcode")
+
+    if timeout is None:
+      calc_timeout = lambda: None
+    else:
+      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
+
     self._cbs = cbs
     try:
-      lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
-      if lu_class is None:
-        raise errors.OpCodeUnknown("Unknown opcode")
-
-      timeout_strategy = _LockAttemptTimeoutStrategy()
+      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+      # and in a shared fashion otherwise (to prevent concurrent run with
+      # an exclusive LU.
+      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                          not lu_class.REQ_BGL, calc_timeout(),
+                          priority)
+      try:
+        lu = lu_class(self, op, self.context, self.rpc)
+        lu.ExpandNames()
+        assert lu.needed_locks is not None, "needed_locks not set by LU"
 
-      while True:
         try:
-          acquire_timeout = timeout_strategy.CalcRemainingTimeout()
-
-          # Acquire the Big Ganeti Lock exclusively if this LU requires it,
-          # and in a shared fashion otherwise (to prevent concurrent run with
-          # an exclusive LU.
-          if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                                not lu_class.REQ_BGL, acquire_timeout) is None:
-            raise _LockAcquireTimeout()
-
-          try:
-            lu = lu_class(self, op, self.context, self.rpc)
-            lu.ExpandNames()
-            assert lu.needed_locks is not None, "needed_locks not set by LU"
-
-            try:
-              return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
-                                         timeout_strategy.CalcRemainingTimeout)
-            finally:
-              if self._ec_id:
-                self.context.cfg.DropECReservations(self._ec_id)
-
-          finally:
-            self.context.glm.release(locking.LEVEL_CLUSTER)
-
-        except _LockAcquireTimeout:
-          # Timeout while waiting for lock, try again
-          pass
-
-        timeout_strategy = timeout_strategy.NextAttempt()
-
+          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
+                                     priority)
+        finally:
+          if self._ec_id:
+            self.context.cfg.DropECReservations(self._ec_id)
+      finally:
+        self.context.glm.release(locking.LEVEL_CLUSTER)
     finally:
       self._cbs = None