timestamp = utils.SplitTime(time.time())
self._AppendFeedback(timestamp, log_type, log_msg)
- def CheckCancel(self):
- """Check whether job has been cancelled.
+ def CurrentPriority(self):
+ """Returns current priority for opcode.
"""
assert self._op.status in (constants.OP_STATUS_WAITING,
# Cancel here if we were asked to
self._CheckCancel()
+ return self._op.priority
+
def SubmitManyJobs(self, jobs):
"""Submits jobs for processing.
# Make sure not to hold queue lock while calling ExecOpCode
result = self.opexec_fn(op.input,
_OpExecCallbacks(self.queue, self.job, op),
- timeout=timeout, priority=op.priority)
+ timeout=timeout)
except mcpu.LockAcquireTimeout:
assert timeout is not None, "Received timeout for blocking acquire"
logging.debug("Couldn't acquire locks in %0.6fs", timeout)
"""
- def CheckCancel(self):
- """Check whether job has been cancelled.
+ def CurrentPriority(self): # pylint: disable=R0201
+ """Returns current priority or C{None}.
"""
+ return None
def SubmitManyJobs(self, jobs):
"""Submits jobs for processing.
if not self._enable_locks:
raise errors.ProgrammerError("Attempted to use disabled locks")
- def _AcquireLocks(self, level, names, shared, timeout, priority):
+ def _AcquireLocks(self, level, names, shared, timeout):
"""Acquires locks via the Ganeti lock manager.
@type level: int
self._CheckLocksEnabled()
if self._cbs:
- self._cbs.CheckCancel()
+ priority = self._cbs.CurrentPriority()
+ else:
+ priority = None
acquired = self.context.glm.acquire(level, names, shared=shared,
timeout=timeout, priority=priority)
def BuildHooksManager(self, lu):
return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
- def _LockAndExecLU(self, lu, level, calc_timeout, priority):
+ def _LockAndExecLU(self, lu, level, calc_timeout):
"""Execute a Logical Unit, with the needed locks.
This is a recursive function that starts locking the given level, and
needed_locks = lu.needed_locks[level]
self._AcquireLocks(level, needed_locks, share,
- calc_timeout(), priority)
+ calc_timeout())
else:
# Adding locks
add_locks = lu.add_locks[level]
errors.ECODE_NOTUNIQUE)
try:
- result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+ result = self._LockAndExecLU(lu, level + 1, calc_timeout)
finally:
if level in lu.remove_locks:
self.context.glm.remove(level, lu.remove_locks[level])
self.context.glm.release(level)
else:
- result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+ result = self._LockAndExecLU(lu, level + 1, calc_timeout)
return result
- def ExecOpCode(self, op, cbs, timeout=None, priority=None):
+ def ExecOpCode(self, op, cbs, timeout=None):
"""Execute an opcode.
@type op: an OpCode instance
@param cbs: Runtime callbacks
@type timeout: float or None
@param timeout: Maximum time to acquire all locks, None for no timeout
- @type priority: number or None
- @param priority: Priority for acquiring lock(s)
@raise LockAcquireTimeout: In case locks couldn't be acquired in specified
amount of time
# and in a shared fashion otherwise (to prevent concurrent run with
# an exclusive LU.
self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
- not lu_class.REQ_BGL, calc_timeout(),
- priority)
+ not lu_class.REQ_BGL, calc_timeout())
elif lu_class.REQ_BGL:
raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
" disabled" % op.OP_ID)
assert lu.needed_locks is not None, "needed_locks not set by LU"
try:
- result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
- priority)
+ result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
finally:
if self._ec_id:
self.context.cfg.DropECReservations(self._ec_id)