X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/831bbbc1854e11504af780daa17b176eeebff262..dadf6b7d371ff1ee7cd3dd51dd8e633133233317:/lib/mcpu.py diff --git a/lib/mcpu.py b/lib/mcpu.py index 9f3eacb..4ee6de5 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -38,6 +38,11 @@ from ganeti import errors from ganeti import rpc from ganeti import cmdlib from ganeti import locking +from ganeti import utils + + +_OP_PREFIX = "Op" +_LU_PREFIX = "LU" class LockAcquireTimeout(Exception): @@ -50,81 +55,61 @@ def _CalculateLockAttemptTimeouts(): """Calculate timeouts for lock attempts. """ - result = [1.0] + result = [constants.LOCK_ATTEMPTS_MINWAIT] + running_sum = result[0] - # Wait for a total of at least 150s before doing a blocking acquire - while sum(result) < 150.0: + # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a + # blocking acquire + while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: timeout = (result[-1] * 1.05) ** 1.25 - # Cap timeout at 10 seconds. This gives other jobs a chance to run - # even if we're still trying to get our locks, before finally moving - # to a blocking acquire. - if timeout > 10.0: - timeout = 10.0 - - elif timeout < 0.1: - # Lower boundary for safety - timeout = 0.1 + # Cap max timeout. This gives other jobs a chance to run even if + # we're still trying to get our locks, before finally moving to a + # blocking acquire. + timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) + # And also cap the lower boundary for safety + timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) result.append(timeout) + running_sum += timeout return result -class _LockAttemptTimeoutStrategy(object): +class LockAttemptTimeoutStrategy(object): """Class with lock acquire timeout strategy. """ __slots__ = [ - "_attempt", + "_timeouts", "_random_fn", - "_start_time", "_time_fn", - "_running_timeout", ] _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() - def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random): + def __init__(self, _time_fn=time.time, _random_fn=random.random): """Initializes this class. - @type attempt: int - @param attempt: Current attempt number @param _time_fn: Time function for unittests @param _random_fn: Random number generator for unittests """ object.__init__(self) - if attempt < 0: - raise ValueError("Attempt must be zero or positive") - - self._attempt = attempt + self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) self._time_fn = _time_fn self._random_fn = _random_fn - try: - timeout = self._TIMEOUT_PER_ATTEMPT[attempt] - except IndexError: - # No more timeouts, do blocking acquire - timeout = None - - self._running_timeout = locking.RunningTimeout(timeout, False, - _time_fn=_time_fn) - def NextAttempt(self): - """Returns the strategy for the next attempt. - - """ - return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1, - _time_fn=self._time_fn, - _random_fn=self._random_fn) - - def CalcRemainingTimeout(self): - """Returns the remaining timeout. + """Returns the timeout for the next attempt. """ - timeout = self._running_timeout.Remaining() + try: + timeout = self._timeouts.next() + except StopIteration: + # No more timeouts, do blocking acquire + timeout = None if timeout is not None: # Add a small variation (-/+ 5%) to timeout. This helps in situations @@ -159,69 +144,28 @@ class OpExecCbBase: # pylint: disable-msg=W0232 """ +def _LUNameForOpName(opname): + """Computes the LU name for a given OpCode name. + + """ + assert opname.startswith(_OP_PREFIX), \ + "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) + + return _LU_PREFIX + opname[len(_OP_PREFIX):] + + +def _ComputeDispatchTable(): + """Computes the opcode-to-lu dispatch table. + + """ + return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) + for op in opcodes.OP_MAPPING.values() + if op.WITH_LU) + + class Processor(object): """Object which runs OpCodes""" - DISPATCH_TABLE = { - # Cluster - opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster, - opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster, - opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo, - opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster, - opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues, - opcodes.OpRenameCluster: cmdlib.LURenameCluster, - opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks, - opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams, - opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig, - opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes, - # node lu - opcodes.OpAddNode: cmdlib.LUAddNode, - opcodes.OpQueryNodes: cmdlib.LUQueryNodes, - opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes, - opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage, - opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage, - opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage, - opcodes.OpRemoveNode: cmdlib.LURemoveNode, - opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams, - opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode, - opcodes.OpMigrateNode: cmdlib.LUMigrateNode, - opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy, - # instance lu - opcodes.OpCreateInstance: cmdlib.LUCreateInstance, - opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance, - opcodes.OpRemoveInstance: cmdlib.LURemoveInstance, - opcodes.OpRenameInstance: cmdlib.LURenameInstance, - opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks, - opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance, - opcodes.OpStartupInstance: cmdlib.LUStartupInstance, - opcodes.OpRebootInstance: cmdlib.LURebootInstance, - opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks, - opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks, - opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks, - opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance, - opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance, - opcodes.OpMoveInstance: cmdlib.LUMoveInstance, - opcodes.OpConnectConsole: cmdlib.LUConnectConsole, - opcodes.OpQueryInstances: cmdlib.LUQueryInstances, - opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData, - opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams, - opcodes.OpGrowDisk: cmdlib.LUGrowDisk, - # os lu - opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS, - # exports lu - opcodes.OpQueryExports: cmdlib.LUQueryExports, - opcodes.OpPrepareExport: cmdlib.LUPrepareExport, - opcodes.OpExportInstance: cmdlib.LUExportInstance, - opcodes.OpRemoveExport: cmdlib.LURemoveExport, - # tags lu - opcodes.OpGetTags: cmdlib.LUGetTags, - opcodes.OpSearchTags: cmdlib.LUSearchTags, - opcodes.OpAddTags: cmdlib.LUAddTags, - opcodes.OpDelTags: cmdlib.LUDelTags, - # test lu - opcodes.OpTestDelay: cmdlib.LUTestDelay, - opcodes.OpTestAllocator: cmdlib.LUTestAllocator, - opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue, - } + DISPATCH_TABLE = _ComputeDispatchTable() def __init__(self, context, ec_id): """Constructor for Processor @@ -238,7 +182,7 @@ class Processor(object): self.rpc = rpc.RpcRunner(context.cfg) self.hmclass = HooksMaster - def _AcquireLocks(self, level, names, shared, timeout): + def _AcquireLocks(self, level, names, shared, timeout, priority): """Acquires locks via the Ganeti lock manager. @type level: int @@ -249,13 +193,18 @@ class Processor(object): @param shared: Whether the locks should be acquired in shared mode @type timeout: None or float @param timeout: Timeout for acquiring the locks + @raise LockAcquireTimeout: In case locks couldn't be acquired in specified + amount of time """ if self._cbs: self._cbs.CheckCancel() acquired = self.context.glm.acquire(level, names, shared=shared, - timeout=timeout) + timeout=timeout, priority=priority) + + if acquired is None: + raise LockAcquireTimeout() return acquired @@ -290,7 +239,7 @@ class Processor(object): return result - def _LockAndExecLU(self, lu, level, calc_timeout): + def _LockAndExecLU(self, lu, level, calc_timeout, priority): """Execute a Logical Unit, with the needed locks. This is a recursive function that starts locking the given level, and @@ -325,11 +274,7 @@ class Processor(object): needed_locks = lu.needed_locks[level] acquired = self._AcquireLocks(level, needed_locks, share, - calc_timeout()) - - if acquired is None: - raise LockAcquireTimeout() - + calc_timeout(), priority) else: # Adding locks add_locks = lu.add_locks[level] @@ -348,7 +293,7 @@ class Processor(object): try: lu.acquired_locks[level] = acquired - result = self._LockAndExecLU(lu, level + 1, calc_timeout) + result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) finally: if level in lu.remove_locks: self.context.glm.remove(level, lu.remove_locks[level]) @@ -357,11 +302,11 @@ class Processor(object): self.context.glm.release(level) else: - result = self._LockAndExecLU(lu, level + 1, calc_timeout) + result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) return result - def ExecOpCode(self, op, cbs, timeout=None): + def ExecOpCode(self, op, cbs, timeout=None, priority=None): """Execute an opcode. @type op: an OpCode instance @@ -370,6 +315,8 @@ class Processor(object): @param cbs: Runtime callbacks @type timeout: float or None @param timeout: Maximum time to acquire all locks, None for no timeout + @type priority: number or None + @param priority: Priority for acquiring lock(s) @raise LockAcquireTimeout: In case locks couldn't be acquired in specified amount of time @@ -385,24 +332,24 @@ class Processor(object): if timeout is None: calc_timeout = lambda: None else: - calc_timeout = locking.RunningTimeout(timeout, False).Remaining + calc_timeout = utils.RunningTimeout(timeout, False).Remaining self._cbs = cbs try: # Acquire the Big Ganeti Lock exclusively if this LU requires it, # and in a shared fashion otherwise (to prevent concurrent run with # an exclusive LU. - if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, - not lu_class.REQ_BGL, calc_timeout()) is None: - raise LockAcquireTimeout() - + self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, + not lu_class.REQ_BGL, calc_timeout(), + priority) try: lu = lu_class(self, op, self.context, self.rpc) lu.ExpandNames() assert lu.needed_locks is not None, "needed_locks not set by LU" try: - return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout) + return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, + priority) finally: if self._ec_id: self.context.cfg.DropECReservations(self._ec_id) @@ -454,8 +401,12 @@ class Processor(object): self.Log(" - INFO: %s" % message) def GetECId(self): + """Returns the current execution context ID. + + """ if not self._ec_id: - errors.ProgrammerError("Tried to use execution context id when not set") + raise errors.ProgrammerError("Tried to use execution context id when" + " not set") return self._ec_id