+from ganeti import utils
+from ganeti import compat
+from ganeti import pathutils
+
+
+_OP_PREFIX = "Op"
+_LU_PREFIX = "LU"
+
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) when they acquire all node or node resource locks
+_NODE_ALLOC_WHITELIST = frozenset([])
+
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
+#: or node resource locks
+_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
+ cmdlib.LUBackupExport,
+ cmdlib.LUBackupRemove,
+ cmdlib.LUOobCommand,
+ ])
+
+
+class LockAcquireTimeout(Exception):
+ """Exception to report timeouts on acquiring locks.
+
+ """
+
+
+def _CalculateLockAttemptTimeouts():
+ """Calculate timeouts for lock attempts.
+
+ """
+ result = [constants.LOCK_ATTEMPTS_MINWAIT]
+ running_sum = result[0]
+
+ # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
+ # blocking acquire
+ while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
+ timeout = (result[-1] * 1.05) ** 1.25
+
+ # Cap max timeout. This gives other jobs a chance to run even if
+ # we're still trying to get our locks, before finally moving to a
+ # blocking acquire.
+ timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
+ # And also cap the lower boundary for safety
+ timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
+
+ result.append(timeout)
+ running_sum += timeout
+
+ return result
+
+
+class LockAttemptTimeoutStrategy(object):
+ """Class with lock acquire timeout strategy.
+
+ """
+ __slots__ = [
+ "_timeouts",
+ "_random_fn",
+ "_time_fn",
+ ]
+
+ _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
+
+ def __init__(self, _time_fn=time.time, _random_fn=random.random):
+ """Initializes this class.
+
+ @param _time_fn: Time function for unittests
+ @param _random_fn: Random number generator for unittests
+
+ """
+ object.__init__(self)
+
+ self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
+ self._time_fn = _time_fn
+ self._random_fn = _random_fn
+
+ def NextAttempt(self):
+ """Returns the timeout for the next attempt.
+
+ """
+ try:
+ timeout = self._timeouts.next()
+ except StopIteration:
+ # No more timeouts, do blocking acquire
+ timeout = None
+
+ if timeout is not None:
+ # Add a small variation (-/+ 5%) to timeout. This helps in situations
+ # where two or more jobs are fighting for the same lock(s).
+ variation_range = timeout * 0.1
+ timeout += ((self._random_fn() * variation_range) -
+ (variation_range * 0.5))
+
+ return timeout
+
+
+class OpExecCbBase: # pylint: disable=W0232
+ """Base class for OpCode execution callbacks.
+
+ """
+ def NotifyStart(self):
+ """Called when we are about to execute the LU.
+
+ This function is called when we're about to start the lu's Exec() method,
+ that is, after we have acquired all locks.
+
+ """
+
+ def Feedback(self, *args):
+ """Sends feedback from the LU code to the end-user.
+
+ """
+
+ def CurrentPriority(self): # pylint: disable=R0201
+ """Returns current priority or C{None}.
+
+ """
+ return None
+
+ def SubmitManyJobs(self, jobs):
+ """Submits jobs for processing.
+
+ See L{jqueue.JobQueue.SubmitManyJobs}.
+
+ """
+ raise NotImplementedError
+
+
+def _LUNameForOpName(opname):
+ """Computes the LU name for a given OpCode name.
+
+ """
+ assert opname.startswith(_OP_PREFIX), \
+ "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
+
+ return _LU_PREFIX + opname[len(_OP_PREFIX):]
+
+
+def _ComputeDispatchTable():
+ """Computes the opcode-to-lu dispatch table.
+
+ """
+ return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
+ for op in opcodes.OP_MAPPING.values()
+ if op.WITH_LU)
+
+
+def _SetBaseOpParams(src, defcomment, dst):
+ """Copies basic opcode parameters.
+
+ @type src: L{opcodes.OpCode}
+ @param src: Source opcode
+ @type defcomment: string
+ @param defcomment: Comment to specify if not already given
+ @type dst: L{opcodes.OpCode}
+ @param dst: Destination opcode
+
+ """
+ if hasattr(src, "debug_level"):
+ dst.debug_level = src.debug_level
+
+ if (getattr(dst, "priority", None) is None and
+ hasattr(src, "priority")):
+ dst.priority = src.priority
+
+ if not getattr(dst, opcodes.COMMENT_ATTR, None):
+ dst.comment = defcomment
+
+
+def _ProcessResult(submit_fn, op, result):
+ """Examines opcode result.
+
+ If necessary, additional processing on the result is done.
+
+ """
+ if isinstance(result, cmdlib.ResultWithJobs):
+ # Copy basic parameters (e.g. priority)
+ map(compat.partial(_SetBaseOpParams, op,
+ "Submitted by %s" % op.OP_ID),
+ itertools.chain(*result.jobs))
+
+ # Submit jobs
+ job_submission = submit_fn(result.jobs)
+
+ # Build dictionary
+ result = result.other
+
+ assert constants.JOB_IDS_KEY not in result, \
+ "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+ result[constants.JOB_IDS_KEY] = job_submission
+
+ return result
+
+
+def _FailingSubmitManyJobs(_):
+ """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
+
+ """
+ raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
+ " queries) can not submit jobs")
+
+
+def _RpcResultsToHooksResults(rpc_results):
+ """Function to convert RPC results to the format expected by HooksMaster.
+
+ @type rpc_results: dict(node: L{rpc.RpcResult})
+ @param rpc_results: RPC results
+ @rtype: dict(node: (fail_msg, offline, hooks_results))
+ @return: RPC results unpacked according to the format expected by
+ L({mcpu.HooksMaster}
+
+ """
+ return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
+ for (node, rpc_res) in rpc_results.items())
+
+
+def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
+ _nal_whitelist=_NODE_ALLOC_WHITELIST):
+ """Performs consistency checks on locks acquired by a logical unit.
+
+ @type lu: L{cmdlib.LogicalUnit}
+ @param lu: Logical unit instance
+ @type glm: L{locking.GanetiLockManager}
+ @param glm: Lock manager
+
+ """
+ if not __debug__:
+ return
+
+ have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
+
+ for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
+ # TODO: Verify using actual lock mode, not using LU variables
+ if level in lu.needed_locks:
+ share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
+ share_level = lu.share_locks[level]
+
+ if lu.__class__ in _mode_whitelist:
+ assert share_node_alloc != share_level, \
+ "LU is whitelisted to use different modes for node allocation lock"
+ else:
+ assert bool(share_node_alloc) == bool(share_level), \
+ ("Node allocation lock must be acquired using the same mode as nodes"
+ " and node resources")
+
+ if lu.__class__ in _nal_whitelist:
+ assert not have_nal, \
+ "LU is whitelisted for not acquiring the node allocation lock"
+ elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
+ assert have_nal, \
+ ("Node allocation lock must be used if an LU acquires all nodes"
+ " or node resources")