+from ganeti import utils
+from ganeti import compat
+
+
+_OP_PREFIX = "Op"
+_LU_PREFIX = "LU"
+
+
+class LockAcquireTimeout(Exception):
+ """Exception to report timeouts on acquiring locks.
+
+ """
+
+
+def _CalculateLockAttemptTimeouts():
+ """Calculate timeouts for lock attempts.
+
+ """
+ result = [constants.LOCK_ATTEMPTS_MINWAIT]
+ running_sum = result[0]
+
+ # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
+ # blocking acquire
+ while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
+ timeout = (result[-1] * 1.05) ** 1.25
+
+ # Cap max timeout. This gives other jobs a chance to run even if
+ # we're still trying to get our locks, before finally moving to a
+ # blocking acquire.
+ timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
+ # And also cap the lower boundary for safety
+ timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
+
+ result.append(timeout)
+ running_sum += timeout
+
+ return result
+
+
+class LockAttemptTimeoutStrategy(object):
+ """Class with lock acquire timeout strategy.
+
+ """
+ __slots__ = [
+ "_timeouts",
+ "_random_fn",
+ "_time_fn",
+ ]
+
+ _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
+
+ def __init__(self, _time_fn=time.time, _random_fn=random.random):
+ """Initializes this class.
+
+ @param _time_fn: Time function for unittests
+ @param _random_fn: Random number generator for unittests
+
+ """
+ object.__init__(self)
+
+ self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
+ self._time_fn = _time_fn
+ self._random_fn = _random_fn
+
+ def NextAttempt(self):
+ """Returns the timeout for the next attempt.
+
+ """
+ try:
+ timeout = self._timeouts.next()
+ except StopIteration:
+ # No more timeouts, do blocking acquire
+ timeout = None
+
+ if timeout is not None:
+ # Add a small variation (-/+ 5%) to timeout. This helps in situations
+ # where two or more jobs are fighting for the same lock(s).
+ variation_range = timeout * 0.1
+ timeout += ((self._random_fn() * variation_range) -
+ (variation_range * 0.5))
+
+ return timeout
+
+
+class OpExecCbBase: # pylint: disable=W0232
+ """Base class for OpCode execution callbacks.
+
+ """
+ def NotifyStart(self):
+ """Called when we are about to execute the LU.
+
+ This function is called when we're about to start the lu's Exec() method,
+ that is, after we have acquired all locks.
+
+ """
+
+ def Feedback(self, *args):
+ """Sends feedback from the LU code to the end-user.
+
+ """
+
+ def CheckCancel(self):
+ """Check whether job has been cancelled.
+
+ """
+
+ def SubmitManyJobs(self, jobs):
+ """Submits jobs for processing.
+
+ See L{jqueue.JobQueue.SubmitManyJobs}.
+
+ """
+ raise NotImplementedError
+
+
+def _LUNameForOpName(opname):
+ """Computes the LU name for a given OpCode name.
+
+ """
+ assert opname.startswith(_OP_PREFIX), \
+ "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
+
+ return _LU_PREFIX + opname[len(_OP_PREFIX):]
+
+
+def _ComputeDispatchTable():
+ """Computes the opcode-to-lu dispatch table.
+
+ """
+ return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
+ for op in opcodes.OP_MAPPING.values()
+ if op.WITH_LU)
+
+
+def _RpcResultsToHooksResults(rpc_results):
+ """Function to convert RPC results to the format expected by HooksMaster.
+
+ @type rpc_results: dict(node: L{rpc.RpcResult})
+ @param rpc_results: RPC results
+ @rtype: dict(node: (fail_msg, offline, hooks_results))
+ @return: RPC results unpacked according to the format expected by
+ L({mcpu.HooksMaster}
+
+ """
+ return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
+ for (node, rpc_res) in rpc_results.items())