"""Calculate timeouts for lock attempts.
"""
- running_sum = 0
result = [1.0]
# Wait for a total of at least 150s before doing a blocking acquire
"_random_fn",
"_start_time",
"_time_fn",
+ "_running_timeout",
]
_TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
self._time_fn = _time_fn
self._random_fn = _random_fn
- self._start_time = None
+ try:
+ timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
+ except IndexError:
+ # No more timeouts, do blocking acquire
+ timeout = None
+
+ self._running_timeout = locking.RunningTimeout(timeout, False,
+ _time_fn=_time_fn)
def NextAttempt(self):
"""Returns the strategy for the next attempt.
"""Returns the remaining timeout.
"""
- try:
- timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt]
- except IndexError:
- # No more timeouts, do blocking acquire
- return None
+ timeout = self._running_timeout.Remaining()
- # Get start time on first calculation
- if self._start_time is None:
- self._start_time = self._time_fn()
+ if timeout is not None:
+ # Add a small variation (-/+ 5%) to timeout. This helps in situations
+ # where two or more jobs are fighting for the same lock(s).
+ variation_range = timeout * 0.1
+ timeout += ((self._random_fn() * variation_range) -
+ (variation_range * 0.5))
- # Calculate remaining time for this attempt
- remaining_timeout = self._start_time + timeout - self._time_fn()
+ return timeout
- # Add a small variation (-/+ 5%) to timeouts. This helps in situations
- # where two or more jobs are fighting for the same lock(s).
- variation_range = remaining_timeout * 0.1
- remaining_timeout += ((self._random_fn() * variation_range) -
- (variation_range * 0.5))
- assert remaining_timeout >= 0.0, "Timeout must be positive"
-
- return remaining_timeout
-
-
-class OpExecCbBase:
+class OpExecCbBase: # pylint: disable-msg=W0232
"""Base class for OpCode execution callbacks.
"""
opcodes.OpRemoveNode: cmdlib.LURemoveNode,
opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
- opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
+ opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
# instance lu
opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
# exports lu
opcodes.OpQueryExports: cmdlib.LUQueryExports,
+ opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
opcodes.OpExportInstance: cmdlib.LUExportInstance,
opcodes.OpRemoveExport: cmdlib.LURemoveExport,
# tags lu
# test lu
opcodes.OpTestDelay: cmdlib.LUTestDelay,
opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
+ opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
}
- def __init__(self, context):
+ def __init__(self, context, ec_id):
"""Constructor for Processor
+ @type context: GanetiContext
+ @param context: global Ganeti context
+ @type ec_id: string
+ @param ec_id: execution context identifier
+
"""
self.context = context
+ self._ec_id = ec_id
self._cbs = None
self.rpc = rpc.RpcRunner(context.cfg)
self.hmclass = HooksMaster
elif isinstance(names, basestring):
parts.append(names)
else:
- parts.append(",".join(names))
+ parts.append(",".join(sorted(names)))
if shared:
parts.append("shared")
hm = HooksMaster(self.rpc.call_hooks_runner, lu)
h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
- self._Feedback, None)
+ self.Log, None)
if getattr(lu.op, "dry_run", False):
# in this mode, no post-hooks are run, and the config is not
return lu.dry_run_result
try:
- result = lu.Exec(self._Feedback)
+ result = lu.Exec(self.Log)
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
- self._Feedback, result)
+ self.Log, result)
finally:
# FIXME: This needs locks if not lu_class.REQ_BGL
if write_count != self.context.cfg.write_count:
except errors.LockError:
raise errors.OpPrereqError(
"Couldn't add locks (%s), probably because of a race condition"
- " with another job, who added them first" % add_locks)
+ " with another job, who added them first" % add_locks,
+ errors.ECODE_FAULT)
acquired = add_locks
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
- return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
- timeout_strategy.CalcRemainingTimeout)
+ try:
+ return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
+ timeout_strategy.CalcRemainingTimeout)
+ finally:
+ if self._ec_id:
+ self.context.cfg.DropECReservations(self._ec_id)
+
finally:
self.context.glm.release(locking.LEVEL_CLUSTER)
finally:
self._cbs = None
- def _Feedback(self, *args):
+ def Log(self, *args):
"""Forward call to feedback callback function.
"""
"""
logging.debug("Step %d/%d %s", current, total, message)
- self._Feedback("STEP %d/%d %s" % (current, total, message))
+ self.Log("STEP %d/%d %s" % (current, total, message))
def LogWarning(self, message, *args, **kwargs):
"""Log a warning to the logs and the user.
message = message % tuple(args)
if message:
logging.warning(message)
- self._Feedback(" - WARNING: %s" % message)
+ self.Log(" - WARNING: %s" % message)
if "hint" in kwargs:
- self._Feedback(" Hint: %s" % kwargs["hint"])
+ self.Log(" Hint: %s" % kwargs["hint"])
def LogInfo(self, message, *args):
"""Log an informational message to the logs and the user.
if args:
message = message % tuple(args)
logging.info(message)
- self._Feedback(" - INFO: %s" % message)
+ self.Log(" - INFO: %s" % message)
+
+ def GetECId(self):
+ if not self._ec_id:
+ errors.ProgrammerError("Tried to use execution context id when not set")
+ return self._ec_id
class HooksMaster(object):