X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b6b87034f8ff84b8051a8b3cbcc5f8dd1942c328..21d7df5fef538b66773f1e9b97f9ba35d0c628e4:/lib/mcpu.py diff --git a/lib/mcpu.py b/lib/mcpu.py index d53bd58..3198780 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -50,7 +50,6 @@ def _CalculateLockAttemptTimeouts(): """Calculate timeouts for lock attempts. """ - running_sum = 0 result = [1.0] # Wait for a total of at least 150s before doing a blocking acquire @@ -81,6 +80,7 @@ class _LockAttemptTimeoutStrategy(object): "_random_fn", "_start_time", "_time_fn", + "_running_timeout", ] _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() @@ -103,7 +103,14 @@ class _LockAttemptTimeoutStrategy(object): self._time_fn = _time_fn self._random_fn = _random_fn - self._start_time = None + try: + timeout = self._TIMEOUT_PER_ATTEMPT[attempt] + except IndexError: + # No more timeouts, do blocking acquire + timeout = None + + self._running_timeout = locking.RunningTimeout(timeout, False, + _time_fn=_time_fn) def NextAttempt(self): """Returns the strategy for the next attempt. @@ -117,32 +124,19 @@ class _LockAttemptTimeoutStrategy(object): """Returns the remaining timeout. """ - try: - timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt] - except IndexError: - # No more timeouts, do blocking acquire - return None + timeout = self._running_timeout.Remaining() - # Get start time on first calculation - if self._start_time is None: - self._start_time = self._time_fn() + if timeout is not None: + # Add a small variation (-/+ 5%) to timeout. This helps in situations + # where two or more jobs are fighting for the same lock(s). + variation_range = timeout * 0.1 + timeout += ((self._random_fn() * variation_range) - + (variation_range * 0.5)) - # Calculate remaining time for this attempt - remaining_timeout = self._start_time + timeout - self._time_fn() + return timeout - # Add a small variation (-/+ 5%) to timeouts. This helps in situations - # where two or more jobs are fighting for the same lock(s). - variation_range = remaining_timeout * 0.1 - remaining_timeout += ((self._random_fn() * variation_range) - - (variation_range * 0.5)) - # Make sure timeout is >= 0 - remaining_timeout = max(0.0, remaining_timeout) - - return remaining_timeout - - -class OpExecCbBase: +class OpExecCbBase: # pylint: disable-msg=W0232 """Base class for OpCode execution callbacks. """ @@ -191,6 +185,7 @@ class Processor(object): opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode, opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode, opcodes.OpMigrateNode: cmdlib.LUMigrateNode, + opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy, # instance lu opcodes.OpCreateInstance: cmdlib.LUCreateInstance, opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance, @@ -215,6 +210,7 @@ class Processor(object): opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS, # exports lu opcodes.OpQueryExports: cmdlib.LUQueryExports, + opcodes.OpPrepareExport: cmdlib.LUPrepareExport, opcodes.OpExportInstance: cmdlib.LUExportInstance, opcodes.OpRemoveExport: cmdlib.LURemoveExport, # tags lu @@ -227,11 +223,17 @@ class Processor(object): opcodes.OpTestAllocator: cmdlib.LUTestAllocator, } - def __init__(self, context): + def __init__(self, context, ec_id): """Constructor for Processor + @type context: GanetiContext + @param context: global Ganeti context + @type ec_id: string + @param ec_id: execution context identifier + """ self.context = context + self._ec_id = ec_id self._cbs = None self.rpc = rpc.RpcRunner(context.cfg) self.hmclass = HooksMaster @@ -275,7 +277,7 @@ class Processor(object): elif isinstance(names, basestring): parts.append(names) else: - parts.append(",".join(names)) + parts.append(",".join(sorted(names))) if shared: parts.append("shared") @@ -392,7 +394,8 @@ class Processor(object): except errors.LockError: raise errors.OpPrereqError( "Couldn't add locks (%s), probably because of a race condition" - " with another job, who added them first" % add_locks) + " with another job, who added them first" % add_locks, + errors.ECODE_FAULT) acquired = add_locks @@ -449,8 +452,13 @@ class Processor(object): lu.ExpandNames() assert lu.needed_locks is not None, "needed_locks not set by LU" - return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, - timeout_strategy.CalcRemainingTimeout) + try: + return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, + timeout_strategy.CalcRemainingTimeout) + finally: + if self._ec_id: + self.context.cfg.DropECReservations(self._ec_id) + finally: self.context.glm.release(locking.LEVEL_CLUSTER) @@ -505,6 +513,11 @@ class Processor(object): logging.info(message) self._Feedback(" - INFO: %s" % message) + def GetECId(self): + if not self._ec_id: + errors.ProgrammerError("Tried to use execution context id when not set") + return self._ec_id + class HooksMaster(object): """Hooks master.