from ganeti import rpc
from ganeti import cmdlib
from ganeti import locking
+from ganeti import utils
-class _LockAcquireTimeout(Exception):
- """Internal exception to report timeouts on acquiring locks.
+class LockAcquireTimeout(Exception):
+ """Exception to report timeouts on acquiring locks.
"""
-class _LockTimeoutStrategy(object):
+def _CalculateLockAttemptTimeouts():
+ """Calculate timeouts for lock attempts.
+
+ """
+ result = [1.0]
+
+ # Wait for a total of at least 150s before doing a blocking acquire
+ while sum(result) < 150.0:
+ timeout = (result[-1] * 1.05) ** 1.25
+
+ # Cap timeout at 10 seconds. This gives other jobs a chance to run
+ # even if we're still trying to get our locks, before finally moving
+ # to a blocking acquire.
+ if timeout > 10.0:
+ timeout = 10.0
+
+ elif timeout < 0.1:
+ # Lower boundary for safety
+ timeout = 0.1
+
+ result.append(timeout)
+
+ return result
+
+
+class LockAttemptTimeoutStrategy(object):
"""Class with lock acquire timeout strategy.
"""
__slots__ = [
- "_attempts",
+ "_timeouts",
"_random_fn",
- "_start_time",
+ "_time_fn",
]
- _MAX_ATTEMPTS = 10
- """How many retries before going into blocking mode"""
+ _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
- _ATTEMPT_FACTOR = 1.75
- """Factor between attempts"""
-
- def __init__(self, _random_fn=None):
+ def __init__(self, _time_fn=time.time, _random_fn=random.random):
"""Initializes this class.
+ @param _time_fn: Time function for unittests
@param _random_fn: Random number generator for unittests
"""
object.__init__(self)
- self._start_time = None
- self._attempts = 0
-
- if _random_fn is None:
- self._random_fn = random.random
- else:
- self._random_fn = _random_fn
+ self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
+ self._time_fn = _time_fn
+ self._random_fn = _random_fn
def NextAttempt(self):
- """Advances to the next attempt.
+ """Returns the timeout for the next attempt.
"""
- assert self._attempts >= 0
- self._attempts += 1
-
- def CalcRemainingTimeout(self):
- """Returns the remaining timeout.
-
- """
- assert self._attempts >= 0
-
- if self._attempts == self._MAX_ATTEMPTS:
- # Only blocking acquires after 10 retries
- return None
-
- if self._attempts > self._MAX_ATTEMPTS:
- raise RuntimeError("Blocking acquire ran into timeout")
-
- # Get start time on first calculation
- if self._start_time is None:
- self._start_time = time.time()
-
- # Calculate remaining time for this attempt
- timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
- time.time())
-
- if timeout > 10.0:
- # Cap timeout at 10 seconds. This gives other jobs a chance to run
- # even if we're still trying to get our locks, before finally moving
- # to a blocking acquire.
- timeout = 10.0
-
- elif timeout < 0.1:
- # Lower boundary
- timeout = 0.1
-
- # Add a small variation (-/+ 5%) to timeouts. This helps in situations
- # where two or more jobs are fighting for the same lock(s).
- variation_range = timeout * 0.1
- timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)
-
- assert timeout >= 0.0, "Timeout must be positive"
+ try:
+ timeout = self._timeouts.next()
+ except StopIteration:
+ # No more timeouts, do blocking acquire
+ timeout = None
+
+ if timeout is not None:
+ # Add a small variation (-/+ 5%) to timeout. This helps in situations
+ # where two or more jobs are fighting for the same lock(s).
+ variation_range = timeout * 0.1
+ timeout += ((self._random_fn() * variation_range) -
+ (variation_range * 0.5))
return timeout
-class OpExecCbBase:
+class OpExecCbBase: # pylint: disable-msg=W0232
"""Base class for OpCode execution callbacks.
"""
"""
- def ReportLocks(self, msg):
- """Report lock operations.
+ def CheckCancel(self):
+ """Check whether job has been cancelled.
"""
opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
+ opcodes.OpQuery: cmdlib.LUQuery,
+ opcodes.OpQueryFields: cmdlib.LUQueryFields,
# node lu
opcodes.OpAddNode: cmdlib.LUAddNode,
opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
opcodes.OpRemoveNode: cmdlib.LURemoveNode,
opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
- opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
+ opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
# instance lu
opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
+ # node group lu
+ opcodes.OpQueryGroups: cmdlib.LUQueryGroups,
# os lu
opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
# exports lu
opcodes.OpQueryExports: cmdlib.LUQueryExports,
+ opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
opcodes.OpExportInstance: cmdlib.LUExportInstance,
opcodes.OpRemoveExport: cmdlib.LURemoveExport,
# tags lu
# test lu
opcodes.OpTestDelay: cmdlib.LUTestDelay,
opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
+ opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
}
- def __init__(self, context):
+ def __init__(self, context, ec_id):
"""Constructor for Processor
+ @type context: GanetiContext
+ @param context: global Ganeti context
+ @type ec_id: string
+ @param ec_id: execution context identifier
+
"""
self.context = context
+ self._ec_id = ec_id
self._cbs = None
self.rpc = rpc.RpcRunner(context.cfg)
self.hmclass = HooksMaster
- def _ReportLocks(self, level, names, shared, timeout, acquired, result):
- """Reports lock operations.
-
- @type level: int
- @param level: Lock level
- @type names: list or string
- @param names: Lock names
- @type shared: bool
- @param shared: Whether the locks should be acquired in shared mode
- @type timeout: None or float
- @param timeout: Timeout for acquiring the locks
- @type acquired: bool
- @param acquired: Whether the locks have already been acquired
- @type result: None or set
- @param result: Result from L{locking.GanetiLockManager.acquire}
-
- """
- parts = []
-
- # Build message
- if acquired:
- if result is None:
- parts.append("timeout")
- else:
- parts.append("acquired")
- else:
- parts.append("waiting")
- if timeout is None:
- parts.append("blocking")
- else:
- parts.append("timeout=%0.6fs" % timeout)
-
- parts.append(locking.LEVEL_NAMES[level])
-
- if names == locking.ALL_SET:
- parts.append("ALL")
- elif isinstance(names, basestring):
- parts.append(names)
- else:
- parts.append(",".join(names))
-
- if shared:
- parts.append("shared")
- else:
- parts.append("exclusive")
-
- msg = "/".join(parts)
-
- logging.debug("LU locks %s", msg)
-
- if self._cbs:
- self._cbs.ReportLocks(msg)
-
- def _AcquireLocks(self, level, names, shared, timeout):
+ def _AcquireLocks(self, level, names, shared, timeout, priority):
"""Acquires locks via the Ganeti lock manager.
@type level: int
@param shared: Whether the locks should be acquired in shared mode
@type timeout: None or float
@param timeout: Timeout for acquiring the locks
+ @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+ amount of time
"""
- self._ReportLocks(level, names, shared, timeout, False, None)
+ if self._cbs:
+ self._cbs.CheckCancel()
acquired = self.context.glm.acquire(level, names, shared=shared,
- timeout=timeout)
+ timeout=timeout, priority=priority)
- self._ReportLocks(level, names, shared, timeout, True, acquired)
+ if acquired is None:
+ raise LockAcquireTimeout()
return acquired
hm = HooksMaster(self.rpc.call_hooks_runner, lu)
h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
- self._Feedback, None)
+ self.Log, None)
if getattr(lu.op, "dry_run", False):
# in this mode, no post-hooks are run, and the config is not
return lu.dry_run_result
try:
- result = lu.Exec(self._Feedback)
+ result = lu.Exec(self.Log)
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
- self._Feedback, result)
+ self.Log, result)
finally:
# FIXME: This needs locks if not lu_class.REQ_BGL
if write_count != self.context.cfg.write_count:
return result
- def _LockAndExecLU(self, lu, level, calc_timeout):
+ def _LockAndExecLU(self, lu, level, calc_timeout, priority):
"""Execute a Logical Unit, with the needed locks.
This is a recursive function that starts locking the given level, and
needed_locks = lu.needed_locks[level]
acquired = self._AcquireLocks(level, needed_locks, share,
- calc_timeout())
-
- if acquired is None:
- raise _LockAcquireTimeout()
-
- lu.acquired_locks[level] = acquired
-
+ calc_timeout(), priority)
else:
# Adding locks
add_locks = lu.add_locks[level]
except errors.LockError:
raise errors.OpPrereqError(
"Couldn't add locks (%s), probably because of a race condition"
- " with another job, who added them first" % add_locks)
+ " with another job, who added them first" % add_locks,
+ errors.ECODE_FAULT)
+
+ acquired = add_locks
- lu.acquired_locks[level] = add_locks
try:
- result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+ lu.acquired_locks[level] = acquired
+
+ result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
finally:
if level in lu.remove_locks:
self.context.glm.remove(level, lu.remove_locks[level])
self.context.glm.release(level)
else:
- result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+ result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
return result
- def ExecOpCode(self, op, cbs):
+ def ExecOpCode(self, op, cbs, timeout=None, priority=None):
"""Execute an opcode.
@type op: an OpCode instance
@param op: the opcode to be executed
@type cbs: L{OpExecCbBase}
@param cbs: Runtime callbacks
+ @type timeout: float or None
+ @param timeout: Maximum time to acquire all locks, None for no timeout
+ @type priority: number or None
+ @param priority: Priority for acquiring lock(s)
+ @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+ amount of time
"""
if not isinstance(op, opcodes.OpCode):
raise errors.ProgrammerError("Non-opcode instance passed"
" to ExecOpcode")
+ lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
+ if lu_class is None:
+ raise errors.OpCodeUnknown("Unknown opcode")
+
+ if timeout is None:
+ calc_timeout = lambda: None
+ else:
+ calc_timeout = utils.RunningTimeout(timeout, False).Remaining
+
self._cbs = cbs
try:
- lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
- if lu_class is None:
- raise errors.OpCodeUnknown("Unknown opcode")
-
- timeout_strategy = _LockTimeoutStrategy()
- calc_timeout = timeout_strategy.CalcRemainingTimeout
+ # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+ # and in a shared fashion otherwise (to prevent concurrent run with
+ # an exclusive LU.
+ self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+ not lu_class.REQ_BGL, calc_timeout(),
+ priority)
+ try:
+ lu = lu_class(self, op, self.context, self.rpc)
+ lu.ExpandNames()
+ assert lu.needed_locks is not None, "needed_locks not set by LU"
- while True:
try:
- # Acquire the Big Ganeti Lock exclusively if this LU requires it,
- # and in a shared fashion otherwise (to prevent concurrent run with
- # an exclusive LU.
- if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
- not lu_class.REQ_BGL, calc_timeout()) is None:
- raise _LockAcquireTimeout()
-
- try:
- lu = lu_class(self, op, self.context, self.rpc)
- lu.ExpandNames()
- assert lu.needed_locks is not None, "needed_locks not set by LU"
-
- return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
- finally:
- self.context.glm.release(locking.LEVEL_CLUSTER)
-
- except _LockAcquireTimeout:
- # Timeout while waiting for lock, try again
- pass
-
- timeout_strategy.NextAttempt()
-
+ return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
+ priority)
+ finally:
+ if self._ec_id:
+ self.context.cfg.DropECReservations(self._ec_id)
+ finally:
+ self.context.glm.release(locking.LEVEL_CLUSTER)
finally:
self._cbs = None
- def _Feedback(self, *args):
+ def Log(self, *args):
"""Forward call to feedback callback function.
"""
"""
logging.debug("Step %d/%d %s", current, total, message)
- self._Feedback("STEP %d/%d %s" % (current, total, message))
+ self.Log("STEP %d/%d %s" % (current, total, message))
def LogWarning(self, message, *args, **kwargs):
"""Log a warning to the logs and the user.
message = message % tuple(args)
if message:
logging.warning(message)
- self._Feedback(" - WARNING: %s" % message)
+ self.Log(" - WARNING: %s" % message)
if "hint" in kwargs:
- self._Feedback(" Hint: %s" % kwargs["hint"])
+ self.Log(" Hint: %s" % kwargs["hint"])
def LogInfo(self, message, *args):
"""Log an informational message to the logs and the user.
if args:
message = message % tuple(args)
logging.info(message)
- self._Feedback(" - INFO: %s" % message)
+ self.Log(" - INFO: %s" % message)
+
+ def GetECId(self):
+ if not self._ec_id:
+ errors.ProgrammerError("Tried to use execution context id when not set")
+ return self._ec_id
class HooksMaster(object):