opcodes: document OP_DSC_FIELD in OpCode and OpCode.Summary()
[ganeti-local] / lib / mcpu.py
index 9f3eacb..a724eb9 100644 (file)
@@ -38,6 +38,7 @@ from ganeti import errors
 from ganeti import rpc
 from ganeti import cmdlib
 from ganeti import locking
 from ganeti import rpc
 from ganeti import cmdlib
 from ganeti import locking
+from ganeti import utils
 
 
 class LockAcquireTimeout(Exception):
 
 
 class LockAcquireTimeout(Exception):
@@ -71,60 +72,40 @@ def _CalculateLockAttemptTimeouts():
   return result
 
 
   return result
 
 
-class _LockAttemptTimeoutStrategy(object):
+class LockAttemptTimeoutStrategy(object):
   """Class with lock acquire timeout strategy.
 
   """
   __slots__ = [
   """Class with lock acquire timeout strategy.
 
   """
   __slots__ = [
-    "_attempt",
+    "_timeouts",
     "_random_fn",
     "_random_fn",
-    "_start_time",
     "_time_fn",
     "_time_fn",
-    "_running_timeout",
     ]
 
   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
 
     ]
 
   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
 
-  def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
+  def __init__(self, _time_fn=time.time, _random_fn=random.random):
     """Initializes this class.
 
     """Initializes this class.
 
-    @type attempt: int
-    @param attempt: Current attempt number
     @param _time_fn: Time function for unittests
     @param _random_fn: Random number generator for unittests
 
     """
     object.__init__(self)
 
     @param _time_fn: Time function for unittests
     @param _random_fn: Random number generator for unittests
 
     """
     object.__init__(self)
 
-    if attempt < 0:
-      raise ValueError("Attempt must be zero or positive")
-
-    self._attempt = attempt
+    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
     self._time_fn = _time_fn
     self._random_fn = _random_fn
 
     self._time_fn = _time_fn
     self._random_fn = _random_fn
 
-    try:
-      timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
-    except IndexError:
-      # No more timeouts, do blocking acquire
-      timeout = None
-
-    self._running_timeout = locking.RunningTimeout(timeout, False,
-                                                   _time_fn=_time_fn)
-
   def NextAttempt(self):
   def NextAttempt(self):
-    """Returns the strategy for the next attempt.
-
-    """
-    return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
-                                       _time_fn=self._time_fn,
-                                       _random_fn=self._random_fn)
-
-  def CalcRemainingTimeout(self):
-    """Returns the remaining timeout.
+    """Returns the timeout for the next attempt.
 
     """
 
     """
-    timeout = self._running_timeout.Remaining()
+    try:
+      timeout = self._timeouts.next()
+    except StopIteration:
+      # No more timeouts, do blocking acquire
+      timeout = None
 
     if timeout is not None:
       # Add a small variation (-/+ 5%) to timeout. This helps in situations
 
     if timeout is not None:
       # Add a small variation (-/+ 5%) to timeout. This helps in situations
@@ -173,6 +154,8 @@ class Processor(object):
     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
+    opcodes.OpQuery: cmdlib.LUQuery,
+    opcodes.OpQueryFields: cmdlib.LUQueryFields,
     # node lu
     opcodes.OpAddNode: cmdlib.LUAddNode,
     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
     # node lu
     opcodes.OpAddNode: cmdlib.LUAddNode,
     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
@@ -205,6 +188,8 @@ class Processor(object):
     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
+    # node group lu
+    opcodes.OpQueryGroups: cmdlib.LUQueryGroups,
     # os lu
     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
     # exports lu
     # os lu
     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
     # exports lu
@@ -238,7 +223,7 @@ class Processor(object):
     self.rpc = rpc.RpcRunner(context.cfg)
     self.hmclass = HooksMaster
 
     self.rpc = rpc.RpcRunner(context.cfg)
     self.hmclass = HooksMaster
 
-  def _AcquireLocks(self, level, names, shared, timeout):
+  def _AcquireLocks(self, level, names, shared, timeout, priority):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -249,13 +234,18 @@ class Processor(object):
     @param shared: Whether the locks should be acquired in shared mode
     @type timeout: None or float
     @param timeout: Timeout for acquiring the locks
     @param shared: Whether the locks should be acquired in shared mode
     @type timeout: None or float
     @param timeout: Timeout for acquiring the locks
+    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+        amount of time
 
     """
     if self._cbs:
       self._cbs.CheckCancel()
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
 
     """
     if self._cbs:
       self._cbs.CheckCancel()
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
-                                        timeout=timeout)
+                                        timeout=timeout, priority=priority)
+
+    if acquired is None:
+      raise LockAcquireTimeout()
 
     return acquired
 
 
     return acquired
 
@@ -290,7 +280,7 @@ class Processor(object):
 
     return result
 
 
     return result
 
-  def _LockAndExecLU(self, lu, level, calc_timeout):
+  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -325,11 +315,7 @@ class Processor(object):
           needed_locks = lu.needed_locks[level]
 
           acquired = self._AcquireLocks(level, needed_locks, share,
           needed_locks = lu.needed_locks[level]
 
           acquired = self._AcquireLocks(level, needed_locks, share,
-                                        calc_timeout())
-
-          if acquired is None:
-            raise LockAcquireTimeout()
-
+                                        calc_timeout(), priority)
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
@@ -348,7 +334,7 @@ class Processor(object):
         try:
           lu.acquired_locks[level] = acquired
 
         try:
           lu.acquired_locks[level] = acquired
 
-          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
@@ -357,11 +343,11 @@ class Processor(object):
           self.context.glm.release(level)
 
     else:
           self.context.glm.release(level)
 
     else:
-      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
 
     return result
 
 
     return result
 
-  def ExecOpCode(self, op, cbs, timeout=None):
+  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
     """Execute an opcode.
 
     @type op: an OpCode instance
     """Execute an opcode.
 
     @type op: an OpCode instance
@@ -370,6 +356,8 @@ class Processor(object):
     @param cbs: Runtime callbacks
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks, None for no timeout
     @param cbs: Runtime callbacks
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks, None for no timeout
+    @type priority: number or None
+    @param priority: Priority for acquiring lock(s)
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
@@ -385,24 +373,24 @@ class Processor(object):
     if timeout is None:
       calc_timeout = lambda: None
     else:
     if timeout is None:
       calc_timeout = lambda: None
     else:
-      calc_timeout = locking.RunningTimeout(timeout, False).Remaining
+      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
 
     self._cbs = cbs
     try:
       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
       # and in a shared fashion otherwise (to prevent concurrent run with
       # an exclusive LU.
 
     self._cbs = cbs
     try:
       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
       # and in a shared fashion otherwise (to prevent concurrent run with
       # an exclusive LU.
-      if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                            not lu_class.REQ_BGL, calc_timeout()) is None:
-        raise LockAcquireTimeout()
-
+      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                          not lu_class.REQ_BGL, calc_timeout(),
+                          priority)
       try:
         lu = lu_class(self, op, self.context, self.rpc)
         lu.ExpandNames()
         assert lu.needed_locks is not None, "needed_locks not set by LU"
 
         try:
       try:
         lu = lu_class(self, op, self.context, self.rpc)
         lu.ExpandNames()
         assert lu.needed_locks is not None, "needed_locks not set by LU"
 
         try:
-          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
+          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
+                                     priority)
         finally:
           if self._ec_id:
             self.context.cfg.DropECReservations(self._ec_id)
         finally:
           if self._ec_id:
             self.context.cfg.DropECReservations(self._ec_id)