X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7b4c1cb96bbb187bd7ce3e8714fcd010f5af3f85..c260fa25a3aa5817417aab8adff96b714cb5f234:/lib/mcpu.py diff --git a/lib/mcpu.py b/lib/mcpu.py index e5c7922..4ee6de5 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -38,10 +38,15 @@ from ganeti import errors from ganeti import rpc from ganeti import cmdlib from ganeti import locking +from ganeti import utils -class _LockAcquireTimeout(Exception): - """Internal exception to report timeouts on acquiring locks. +_OP_PREFIX = "Op" +_LU_PREFIX = "LU" + + +class LockAcquireTimeout(Exception): + """Exception to report timeouts on acquiring locks. """ @@ -50,81 +55,61 @@ def _CalculateLockAttemptTimeouts(): """Calculate timeouts for lock attempts. """ - result = [1.0] + result = [constants.LOCK_ATTEMPTS_MINWAIT] + running_sum = result[0] - # Wait for a total of at least 150s before doing a blocking acquire - while sum(result) < 150.0: + # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a + # blocking acquire + while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: timeout = (result[-1] * 1.05) ** 1.25 - # Cap timeout at 10 seconds. This gives other jobs a chance to run - # even if we're still trying to get our locks, before finally moving - # to a blocking acquire. - if timeout > 10.0: - timeout = 10.0 - - elif timeout < 0.1: - # Lower boundary for safety - timeout = 0.1 + # Cap max timeout. This gives other jobs a chance to run even if + # we're still trying to get our locks, before finally moving to a + # blocking acquire. + timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) + # And also cap the lower boundary for safety + timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) result.append(timeout) + running_sum += timeout return result -class _LockAttemptTimeoutStrategy(object): +class LockAttemptTimeoutStrategy(object): """Class with lock acquire timeout strategy. """ __slots__ = [ - "_attempt", + "_timeouts", "_random_fn", - "_start_time", "_time_fn", - "_running_timeout", ] _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() - def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random): + def __init__(self, _time_fn=time.time, _random_fn=random.random): """Initializes this class. - @type attempt: int - @param attempt: Current attempt number @param _time_fn: Time function for unittests @param _random_fn: Random number generator for unittests """ object.__init__(self) - if attempt < 0: - raise ValueError("Attempt must be zero or positive") - - self._attempt = attempt + self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) self._time_fn = _time_fn self._random_fn = _random_fn - try: - timeout = self._TIMEOUT_PER_ATTEMPT[attempt] - except IndexError: - # No more timeouts, do blocking acquire - timeout = None - - self._running_timeout = locking.RunningTimeout(timeout, False, - _time_fn=_time_fn) - def NextAttempt(self): - """Returns the strategy for the next attempt. + """Returns the timeout for the next attempt. """ - return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1, - _time_fn=self._time_fn, - _random_fn=self._random_fn) - - def CalcRemainingTimeout(self): - """Returns the remaining timeout. - - """ - timeout = self._running_timeout.Remaining() + try: + timeout = self._timeouts.next() + except StopIteration: + # No more timeouts, do blocking acquire + timeout = None if timeout is not None: # Add a small variation (-/+ 5%) to timeout. This helps in situations @@ -153,74 +138,34 @@ class OpExecCbBase: # pylint: disable-msg=W0232 """ - def ReportLocks(self, msg): - """Report lock operations. + def CheckCancel(self): + """Check whether job has been cancelled. """ +def _LUNameForOpName(opname): + """Computes the LU name for a given OpCode name. + + """ + assert opname.startswith(_OP_PREFIX), \ + "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) + + return _LU_PREFIX + opname[len(_OP_PREFIX):] + + +def _ComputeDispatchTable(): + """Computes the opcode-to-lu dispatch table. + + """ + return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) + for op in opcodes.OP_MAPPING.values() + if op.WITH_LU) + + class Processor(object): """Object which runs OpCodes""" - DISPATCH_TABLE = { - # Cluster - opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster, - opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster, - opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo, - opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster, - opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues, - opcodes.OpRenameCluster: cmdlib.LURenameCluster, - opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks, - opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams, - opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig, - opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes, - # node lu - opcodes.OpAddNode: cmdlib.LUAddNode, - opcodes.OpQueryNodes: cmdlib.LUQueryNodes, - opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes, - opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage, - opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage, - opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage, - opcodes.OpRemoveNode: cmdlib.LURemoveNode, - opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams, - opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode, - opcodes.OpMigrateNode: cmdlib.LUMigrateNode, - opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy, - # instance lu - opcodes.OpCreateInstance: cmdlib.LUCreateInstance, - opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance, - opcodes.OpRemoveInstance: cmdlib.LURemoveInstance, - opcodes.OpRenameInstance: cmdlib.LURenameInstance, - opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks, - opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance, - opcodes.OpStartupInstance: cmdlib.LUStartupInstance, - opcodes.OpRebootInstance: cmdlib.LURebootInstance, - opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks, - opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks, - opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks, - opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance, - opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance, - opcodes.OpMoveInstance: cmdlib.LUMoveInstance, - opcodes.OpConnectConsole: cmdlib.LUConnectConsole, - opcodes.OpQueryInstances: cmdlib.LUQueryInstances, - opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData, - opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams, - opcodes.OpGrowDisk: cmdlib.LUGrowDisk, - # os lu - opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS, - # exports lu - opcodes.OpQueryExports: cmdlib.LUQueryExports, - opcodes.OpPrepareExport: cmdlib.LUPrepareExport, - opcodes.OpExportInstance: cmdlib.LUExportInstance, - opcodes.OpRemoveExport: cmdlib.LURemoveExport, - # tags lu - opcodes.OpGetTags: cmdlib.LUGetTags, - opcodes.OpSearchTags: cmdlib.LUSearchTags, - opcodes.OpAddTags: cmdlib.LUAddTags, - opcodes.OpDelTags: cmdlib.LUDelTags, - # test lu - opcodes.OpTestDelay: cmdlib.LUTestDelay, - opcodes.OpTestAllocator: cmdlib.LUTestAllocator, - } + DISPATCH_TABLE = _ComputeDispatchTable() def __init__(self, context, ec_id): """Constructor for Processor @@ -237,60 +182,7 @@ class Processor(object): self.rpc = rpc.RpcRunner(context.cfg) self.hmclass = HooksMaster - def _ReportLocks(self, level, names, shared, timeout, acquired, result): - """Reports lock operations. - - @type level: int - @param level: Lock level - @type names: list or string - @param names: Lock names - @type shared: bool - @param shared: Whether the locks should be acquired in shared mode - @type timeout: None or float - @param timeout: Timeout for acquiring the locks - @type acquired: bool - @param acquired: Whether the locks have already been acquired - @type result: None or set - @param result: Result from L{locking.GanetiLockManager.acquire} - - """ - parts = [] - - # Build message - if acquired: - if result is None: - parts.append("timeout") - else: - parts.append("acquired") - else: - parts.append("waiting") - if timeout is None: - parts.append("blocking") - else: - parts.append("timeout=%0.6fs" % timeout) - - parts.append(locking.LEVEL_NAMES[level]) - - if names == locking.ALL_SET: - parts.append("ALL") - elif isinstance(names, basestring): - parts.append(names) - else: - parts.append(",".join(sorted(names))) - - if shared: - parts.append("shared") - else: - parts.append("exclusive") - - msg = "/".join(parts) - - logging.debug("LU locks %s", msg) - - if self._cbs: - self._cbs.ReportLocks(msg) - - def _AcquireLocks(self, level, names, shared, timeout): + def _AcquireLocks(self, level, names, shared, timeout, priority): """Acquires locks via the Ganeti lock manager. @type level: int @@ -301,14 +193,18 @@ class Processor(object): @param shared: Whether the locks should be acquired in shared mode @type timeout: None or float @param timeout: Timeout for acquiring the locks + @raise LockAcquireTimeout: In case locks couldn't be acquired in specified + amount of time """ - self._ReportLocks(level, names, shared, timeout, False, None) + if self._cbs: + self._cbs.CheckCancel() acquired = self.context.glm.acquire(level, names, shared=shared, - timeout=timeout) + timeout=timeout, priority=priority) - self._ReportLocks(level, names, shared, timeout, True, acquired) + if acquired is None: + raise LockAcquireTimeout() return acquired @@ -343,7 +239,7 @@ class Processor(object): return result - def _LockAndExecLU(self, lu, level, calc_timeout): + def _LockAndExecLU(self, lu, level, calc_timeout, priority): """Execute a Logical Unit, with the needed locks. This is a recursive function that starts locking the given level, and @@ -378,11 +274,7 @@ class Processor(object): needed_locks = lu.needed_locks[level] acquired = self._AcquireLocks(level, needed_locks, share, - calc_timeout()) - - if acquired is None: - raise _LockAcquireTimeout() - + calc_timeout(), priority) else: # Adding locks add_locks = lu.add_locks[level] @@ -401,7 +293,7 @@ class Processor(object): try: lu.acquired_locks[level] = acquired - result = self._LockAndExecLU(lu, level + 1, calc_timeout) + result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) finally: if level in lu.remove_locks: self.context.glm.remove(level, lu.remove_locks[level]) @@ -410,63 +302,59 @@ class Processor(object): self.context.glm.release(level) else: - result = self._LockAndExecLU(lu, level + 1, calc_timeout) + result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) return result - def ExecOpCode(self, op, cbs): + def ExecOpCode(self, op, cbs, timeout=None, priority=None): """Execute an opcode. @type op: an OpCode instance @param op: the opcode to be executed @type cbs: L{OpExecCbBase} @param cbs: Runtime callbacks + @type timeout: float or None + @param timeout: Maximum time to acquire all locks, None for no timeout + @type priority: number or None + @param priority: Priority for acquiring lock(s) + @raise LockAcquireTimeout: In case locks couldn't be acquired in specified + amount of time """ if not isinstance(op, opcodes.OpCode): raise errors.ProgrammerError("Non-opcode instance passed" " to ExecOpcode") + lu_class = self.DISPATCH_TABLE.get(op.__class__, None) + if lu_class is None: + raise errors.OpCodeUnknown("Unknown opcode") + + if timeout is None: + calc_timeout = lambda: None + else: + calc_timeout = utils.RunningTimeout(timeout, False).Remaining + self._cbs = cbs try: - lu_class = self.DISPATCH_TABLE.get(op.__class__, None) - if lu_class is None: - raise errors.OpCodeUnknown("Unknown opcode") - - timeout_strategy = _LockAttemptTimeoutStrategy() + # Acquire the Big Ganeti Lock exclusively if this LU requires it, + # and in a shared fashion otherwise (to prevent concurrent run with + # an exclusive LU. + self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, + not lu_class.REQ_BGL, calc_timeout(), + priority) + try: + lu = lu_class(self, op, self.context, self.rpc) + lu.ExpandNames() + assert lu.needed_locks is not None, "needed_locks not set by LU" - while True: try: - acquire_timeout = timeout_strategy.CalcRemainingTimeout() - - # Acquire the Big Ganeti Lock exclusively if this LU requires it, - # and in a shared fashion otherwise (to prevent concurrent run with - # an exclusive LU. - if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, - not lu_class.REQ_BGL, acquire_timeout) is None: - raise _LockAcquireTimeout() - - try: - lu = lu_class(self, op, self.context, self.rpc) - lu.ExpandNames() - assert lu.needed_locks is not None, "needed_locks not set by LU" - - try: - return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, - timeout_strategy.CalcRemainingTimeout) - finally: - if self._ec_id: - self.context.cfg.DropECReservations(self._ec_id) - - finally: - self.context.glm.release(locking.LEVEL_CLUSTER) - - except _LockAcquireTimeout: - # Timeout while waiting for lock, try again - pass - - timeout_strategy = timeout_strategy.NextAttempt() - + return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, + priority) + finally: + if self._ec_id: + self.context.cfg.DropECReservations(self._ec_id) + finally: + self.context.glm.release(locking.LEVEL_CLUSTER) finally: self._cbs = None @@ -513,8 +401,12 @@ class Processor(object): self.Log(" - INFO: %s" % message) def GetECId(self): + """Returns the current execution context ID. + + """ if not self._ec_id: - errors.ProgrammerError("Tried to use execution context id when not set") + raise errors.ProgrammerError("Tried to use execution context id when" + " not set") return self._ec_id