X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/211b613211db900c5cb56884b6479a8332161b85..97b40f39d75679418c8f6739331ffefeff1f3ce0:/lib/mcpu.py diff --git a/lib/mcpu.py b/lib/mcpu.py index 5bcfc27..4ee6de5 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -41,93 +41,87 @@ from ganeti import locking from ganeti import utils -class _LockAcquireTimeout(Exception): - """Internal exception to report timeouts on acquiring locks. +_OP_PREFIX = "Op" +_LU_PREFIX = "LU" + + +class LockAcquireTimeout(Exception): + """Exception to report timeouts on acquiring locks. + + """ + + +def _CalculateLockAttemptTimeouts(): + """Calculate timeouts for lock attempts. """ + result = [constants.LOCK_ATTEMPTS_MINWAIT] + running_sum = result[0] + # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a + # blocking acquire + while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: + timeout = (result[-1] * 1.05) ** 1.25 -class _LockTimeoutStrategy(object): + # Cap max timeout. This gives other jobs a chance to run even if + # we're still trying to get our locks, before finally moving to a + # blocking acquire. + timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) + # And also cap the lower boundary for safety + timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) + + result.append(timeout) + running_sum += timeout + + return result + + +class LockAttemptTimeoutStrategy(object): """Class with lock acquire timeout strategy. """ __slots__ = [ - "_attempts", + "_timeouts", "_random_fn", - "_start_time", + "_time_fn", ] - _MAX_ATTEMPTS = 10 - """How many retries before going into blocking mode""" - - _ATTEMPT_FACTOR = 1.75 - """Factor between attempts""" + _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() - def __init__(self, _random_fn=None): + def __init__(self, _time_fn=time.time, _random_fn=random.random): """Initializes this class. + @param _time_fn: Time function for unittests @param _random_fn: Random number generator for unittests """ object.__init__(self) - self._start_time = None - self._attempts = 0 - - if _random_fn is None: - self._random_fn = random.random - else: - self._random_fn = _random_fn + self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT) + self._time_fn = _time_fn + self._random_fn = _random_fn def NextAttempt(self): - """Advances to the next attempt. - - """ - assert self._attempts >= 0 - self._attempts += 1 - - def CalcRemainingTimeout(self): - """Returns the remaining timeout. + """Returns the timeout for the next attempt. """ - assert self._attempts >= 0 - - if self._attempts == self._MAX_ATTEMPTS: - # Only blocking acquires after 10 retries - return None - - if self._attempts > self._MAX_ATTEMPTS: - raise RuntimeError("Blocking acquire ran into timeout") - - # Get start time on first calculation - if self._start_time is None: - self._start_time = time.time() - - # Calculate remaining time for this attempt - timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) - - time.time()) - - if timeout > 10.0: - # Cap timeout at 10 seconds. This gives other jobs a chance to run - # even if we're still trying to get our locks, before finally moving - # to a blocking acquire. - timeout = 10.0 - - elif timeout < 0.1: - # Lower boundary - timeout = 0.1 - - # Add a small variation (-/+ 5%) to timeouts. This helps in situations - # where two or more jobs are fighting for the same lock(s). - variation_range = timeout * 0.1 - timeout += (self._random_fn() * variation_range) - (variation_range * 0.5) - - assert timeout >= 0.0, "Timeout must be positive" + try: + timeout = self._timeouts.next() + except StopIteration: + # No more timeouts, do blocking acquire + timeout = None + + if timeout is not None: + # Add a small variation (-/+ 5%) to timeout. This helps in situations + # where two or more jobs are fighting for the same lock(s). + variation_range = timeout * 0.1 + timeout += ((self._random_fn() * variation_range) - + (variation_range * 0.5)) return timeout -class OpExecCbBase: +class OpExecCbBase: # pylint: disable-msg=W0232 """Base class for OpCode execution callbacks. """ @@ -144,137 +138,51 @@ class OpExecCbBase: """ - def ReportLocks(self, msg): - """Report lock operations. + def CheckCancel(self): + """Check whether job has been cancelled. """ -class Processor(object): - """Object which runs OpCodes""" - DISPATCH_TABLE = { - # Cluster - opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster, - opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster, - opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo, - opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster, - opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues, - opcodes.OpRenameCluster: cmdlib.LURenameCluster, - opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks, - opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams, - opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig, - opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes, - # node lu - opcodes.OpAddNode: cmdlib.LUAddNode, - opcodes.OpQueryNodes: cmdlib.LUQueryNodes, - opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes, - opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage, - opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage, - opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage, - opcodes.OpRemoveNode: cmdlib.LURemoveNode, - opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams, - opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode, - opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode, - opcodes.OpMigrateNode: cmdlib.LUMigrateNode, - # instance lu - opcodes.OpCreateInstance: cmdlib.LUCreateInstance, - opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance, - opcodes.OpRemoveInstance: cmdlib.LURemoveInstance, - opcodes.OpRenameInstance: cmdlib.LURenameInstance, - opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks, - opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance, - opcodes.OpStartupInstance: cmdlib.LUStartupInstance, - opcodes.OpRebootInstance: cmdlib.LURebootInstance, - opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks, - opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks, - opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks, - opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance, - opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance, - opcodes.OpMoveInstance: cmdlib.LUMoveInstance, - opcodes.OpConnectConsole: cmdlib.LUConnectConsole, - opcodes.OpQueryInstances: cmdlib.LUQueryInstances, - opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData, - opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams, - opcodes.OpGrowDisk: cmdlib.LUGrowDisk, - # os lu - opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS, - # exports lu - opcodes.OpQueryExports: cmdlib.LUQueryExports, - opcodes.OpExportInstance: cmdlib.LUExportInstance, - opcodes.OpRemoveExport: cmdlib.LURemoveExport, - # tags lu - opcodes.OpGetTags: cmdlib.LUGetTags, - opcodes.OpSearchTags: cmdlib.LUSearchTags, - opcodes.OpAddTags: cmdlib.LUAddTags, - opcodes.OpDelTags: cmdlib.LUDelTags, - # test lu - opcodes.OpTestDelay: cmdlib.LUTestDelay, - opcodes.OpTestAllocator: cmdlib.LUTestAllocator, - } - - def __init__(self, context): - """Constructor for Processor +def _LUNameForOpName(opname): + """Computes the LU name for a given OpCode name. - """ - self.context = context - self._cbs = None - self.rpc = rpc.RpcRunner(context.cfg) - self.hmclass = HooksMaster + """ + assert opname.startswith(_OP_PREFIX), \ + "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname) - def _ReportLocks(self, level, names, shared, timeout, acquired, result): - """Reports lock operations. + return _LU_PREFIX + opname[len(_OP_PREFIX):] - @type level: int - @param level: Lock level - @type names: list or string - @param names: Lock names - @type shared: bool - @param shared: Whether the locks should be acquired in shared mode - @type timeout: None or float - @param timeout: Timeout for acquiring the locks - @type acquired: bool - @param acquired: Whether the locks have already been acquired - @type result: None or set - @param result: Result from L{locking.GanetiLockManager.acquire} - """ - parts = [] +def _ComputeDispatchTable(): + """Computes the opcode-to-lu dispatch table. - # Build message - if acquired: - if result is None: - parts.append("timeout") - else: - parts.append("acquired") - else: - parts.append("waiting") - if timeout is None: - parts.append("blocking") - else: - parts.append("timeout=%0.6fs" % timeout) + """ + return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__))) + for op in opcodes.OP_MAPPING.values() + if op.WITH_LU) - parts.append(locking.LEVEL_NAMES[level]) - if names == locking.ALL_SET: - parts.append("ALL") - elif isinstance(names, basestring): - parts.append(names) - else: - parts.append(",".join(names)) - - if shared: - parts.append("shared") - else: - parts.append("exclusive") +class Processor(object): + """Object which runs OpCodes""" + DISPATCH_TABLE = _ComputeDispatchTable() - msg = "/".join(parts) + def __init__(self, context, ec_id): + """Constructor for Processor - logging.debug("LU locks %s", msg) + @type context: GanetiContext + @param context: global Ganeti context + @type ec_id: string + @param ec_id: execution context identifier - if self._cbs: - self._cbs.ReportLocks(msg) + """ + self.context = context + self._ec_id = ec_id + self._cbs = None + self.rpc = rpc.RpcRunner(context.cfg) + self.hmclass = HooksMaster - def _AcquireLocks(self, level, names, shared, timeout): + def _AcquireLocks(self, level, names, shared, timeout, priority): """Acquires locks via the Ganeti lock manager. @type level: int @@ -285,14 +193,18 @@ class Processor(object): @param shared: Whether the locks should be acquired in shared mode @type timeout: None or float @param timeout: Timeout for acquiring the locks + @raise LockAcquireTimeout: In case locks couldn't be acquired in specified + amount of time """ - self._ReportLocks(level, names, shared, timeout, False, None) + if self._cbs: + self._cbs.CheckCancel() acquired = self.context.glm.acquire(level, names, shared=shared, - timeout=timeout) + timeout=timeout, priority=priority) - self._ReportLocks(level, names, shared, timeout, True, acquired) + if acquired is None: + raise LockAcquireTimeout() return acquired @@ -305,7 +217,7 @@ class Processor(object): hm = HooksMaster(self.rpc.call_hooks_runner, lu) h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, - self._Feedback, None) + self.Log, None) if getattr(lu.op, "dry_run", False): # in this mode, no post-hooks are run, and the config is not @@ -316,10 +228,10 @@ class Processor(object): return lu.dry_run_result try: - result = lu.Exec(self._Feedback) + result = lu.Exec(self.Log) h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, - self._Feedback, result) + self.Log, result) finally: # FIXME: This needs locks if not lu_class.REQ_BGL if write_count != self.context.cfg.write_count: @@ -327,7 +239,7 @@ class Processor(object): return result - def _LockAndExecLU(self, lu, level, calc_timeout): + def _LockAndExecLU(self, lu, level, calc_timeout, priority): """Execute a Logical Unit, with the needed locks. This is a recursive function that starts locking the given level, and @@ -362,13 +274,7 @@ class Processor(object): needed_locks = lu.needed_locks[level] acquired = self._AcquireLocks(level, needed_locks, share, - calc_timeout()) - - if acquired is None: - raise _LockAcquireTimeout() - - lu.acquired_locks[level] = acquired - + calc_timeout(), priority) else: # Adding locks add_locks = lu.add_locks[level] @@ -379,11 +285,15 @@ class Processor(object): except errors.LockError: raise errors.OpPrereqError( "Couldn't add locks (%s), probably because of a race condition" - " with another job, who added them first" % add_locks) + " with another job, who added them first" % add_locks, + errors.ECODE_FAULT) + + acquired = add_locks - lu.acquired_locks[level] = add_locks try: - result = self._LockAndExecLU(lu, level + 1, calc_timeout) + lu.acquired_locks[level] = acquired + + result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) finally: if level in lu.remove_locks: self.context.glm.remove(level, lu.remove_locks[level]) @@ -392,60 +302,63 @@ class Processor(object): self.context.glm.release(level) else: - result = self._LockAndExecLU(lu, level + 1, calc_timeout) + result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) return result - def ExecOpCode(self, op, cbs): + def ExecOpCode(self, op, cbs, timeout=None, priority=None): """Execute an opcode. @type op: an OpCode instance @param op: the opcode to be executed @type cbs: L{OpExecCbBase} @param cbs: Runtime callbacks + @type timeout: float or None + @param timeout: Maximum time to acquire all locks, None for no timeout + @type priority: number or None + @param priority: Priority for acquiring lock(s) + @raise LockAcquireTimeout: In case locks couldn't be acquired in specified + amount of time """ if not isinstance(op, opcodes.OpCode): raise errors.ProgrammerError("Non-opcode instance passed" " to ExecOpcode") + lu_class = self.DISPATCH_TABLE.get(op.__class__, None) + if lu_class is None: + raise errors.OpCodeUnknown("Unknown opcode") + + if timeout is None: + calc_timeout = lambda: None + else: + calc_timeout = utils.RunningTimeout(timeout, False).Remaining + self._cbs = cbs try: - lu_class = self.DISPATCH_TABLE.get(op.__class__, None) - if lu_class is None: - raise errors.OpCodeUnknown("Unknown opcode") - - timeout_strategy = _LockTimeoutStrategy() - calc_timeout = timeout_strategy.CalcRemainingTimeout + # Acquire the Big Ganeti Lock exclusively if this LU requires it, + # and in a shared fashion otherwise (to prevent concurrent run with + # an exclusive LU. + self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, + not lu_class.REQ_BGL, calc_timeout(), + priority) + try: + lu = lu_class(self, op, self.context, self.rpc) + lu.ExpandNames() + assert lu.needed_locks is not None, "needed_locks not set by LU" - while True: try: - # Acquire the Big Ganeti Lock exclusively if this LU requires it, - # and in a shared fashion otherwise (to prevent concurrent run with - # an exclusive LU. - if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, - not lu_class.REQ_BGL, calc_timeout()) is None: - raise _LockAcquireTimeout() - - try: - lu = lu_class(self, op, self.context, self.rpc) - lu.ExpandNames() - assert lu.needed_locks is not None, "needed_locks not set by LU" - - return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout) - finally: - self.context.glm.release(locking.LEVEL_CLUSTER) - - except _LockAcquireTimeout: - # Timeout while waiting for lock, try again - pass - - timeout_strategy.NextAttempt() - + return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, + priority) + finally: + if self._ec_id: + self.context.cfg.DropECReservations(self._ec_id) + finally: + self.context.glm.release(locking.LEVEL_CLUSTER) finally: self._cbs = None - def _Feedback(self, *args): + def Log(self, *args): """Forward call to feedback callback function. """ @@ -457,7 +370,7 @@ class Processor(object): """ logging.debug("Step %d/%d %s", current, total, message) - self._Feedback("STEP %d/%d %s" % (current, total, message)) + self.Log("STEP %d/%d %s" % (current, total, message)) def LogWarning(self, message, *args, **kwargs): """Log a warning to the logs and the user. @@ -474,9 +387,9 @@ class Processor(object): message = message % tuple(args) if message: logging.warning(message) - self._Feedback(" - WARNING: %s" % message) + self.Log(" - WARNING: %s" % message) if "hint" in kwargs: - self._Feedback(" Hint: %s" % kwargs["hint"]) + self.Log(" Hint: %s" % kwargs["hint"]) def LogInfo(self, message, *args): """Log an informational message to the logs and the user. @@ -485,7 +398,16 @@ class Processor(object): if args: message = message % tuple(args) logging.info(message) - self._Feedback(" - INFO: %s" % message) + self.Log(" - INFO: %s" % message) + + def GetECId(self): + """Returns the current execution context ID. + + """ + if not self._ec_id: + raise errors.ProgrammerError("Tried to use execution context id when" + " not set") + return self._ec_id class HooksMaster(object):