+class _LockAcquireTimeout(Exception):
+ """Internal exception to report timeouts on acquiring locks.
+
+ """
+
+
+def _CalculateLockAttemptTimeouts():
+ """Calculate timeouts for lock attempts.
+
+ """
+ result = [1.0]
+
+ # Wait for a total of at least 150s before doing a blocking acquire
+ while sum(result) < 150.0:
+ timeout = (result[-1] * 1.05) ** 1.25
+
+ # Cap timeout at 10 seconds. This gives other jobs a chance to run
+ # even if we're still trying to get our locks, before finally moving
+ # to a blocking acquire.
+ if timeout > 10.0:
+ timeout = 10.0
+
+ elif timeout < 0.1:
+ # Lower boundary for safety
+ timeout = 0.1
+
+ result.append(timeout)
+
+ return result
+
+
+class _LockAttemptTimeoutStrategy(object):
+ """Class with lock acquire timeout strategy.
+
+ """
+ __slots__ = [
+ "_attempt",
+ "_random_fn",
+ "_start_time",
+ "_time_fn",
+ "_running_timeout",
+ ]
+
+ _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
+
+ def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
+ """Initializes this class.
+
+ @type attempt: int
+ @param attempt: Current attempt number
+ @param _time_fn: Time function for unittests
+ @param _random_fn: Random number generator for unittests
+
+ """
+ object.__init__(self)
+
+ if attempt < 0:
+ raise ValueError("Attempt must be zero or positive")
+
+ self._attempt = attempt
+ self._time_fn = _time_fn
+ self._random_fn = _random_fn
+
+ try:
+ timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
+ except IndexError:
+ # No more timeouts, do blocking acquire
+ timeout = None
+
+ self._running_timeout = locking.RunningTimeout(timeout, False,
+ _time_fn=_time_fn)
+
+ def NextAttempt(self):
+ """Returns the strategy for the next attempt.
+
+ """
+ return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
+ _time_fn=self._time_fn,
+ _random_fn=self._random_fn)
+
+ def CalcRemainingTimeout(self):
+ """Returns the remaining timeout.
+
+ """
+ timeout = self._running_timeout.Remaining()
+
+ if timeout is not None:
+ # Add a small variation (-/+ 5%) to timeout. This helps in situations
+ # where two or more jobs are fighting for the same lock(s).
+ variation_range = timeout * 0.1
+ timeout += ((self._random_fn() * variation_range) -
+ (variation_range * 0.5))
+
+ return timeout
+
+
+class OpExecCbBase: # pylint: disable-msg=W0232
+ """Base class for OpCode execution callbacks.
+
+ """
+ def NotifyStart(self):
+ """Called when we are about to execute the LU.
+
+ This function is called when we're about to start the lu's Exec() method,
+ that is, after we have acquired all locks.
+
+ """
+
+ def Feedback(self, *args):
+ """Sends feedback from the LU code to the end-user.
+
+ """
+
+ def ReportLocks(self, msg):
+ """Report lock operations.
+
+ """
+
+