@param result: the opcode result
"""
- not_marked = True
- for op in self.ops:
- if op.status in constants.OPS_FINALIZED:
- assert not_marked, "Finalized opcodes found after non-finalized ones"
- continue
- op.status = status
- op.result = result
- not_marked = False
+ try:
+ not_marked = True
+ for op in self.ops:
+ if op.status in constants.OPS_FINALIZED:
+ assert not_marked, "Finalized opcodes found after non-finalized ones"
+ continue
+ op.status = status
+ op.result = result
+ not_marked = False
+ finally:
+ self.queue.UpdateJobUnlocked(self)
class _OpExecCallbacks(mcpu.OpExecCbBase):
except CancelJob:
queue.acquire()
try:
- queue.CancelJobUnlocked(job)
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+ "Job canceled by request")
finally:
queue.release()
except errors.GenericError, err:
constants.JOB_STATUS_WAITLOCK,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
- try:
- job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
- "Unclean master daemon shutdown")
- finally:
- self.UpdateJobUnlocked(job)
+ job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+ "Unclean master daemon shutdown")
logging.info("Job queue inspection finished")
finally:
return (False, "Job %s is no longer waiting in the queue" % job.id)
if job_status == constants.JOB_STATUS_QUEUED:
- self.CancelJobUnlocked(job)
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+ "Job canceled by request")
return (True, "Job %s canceled" % job.id)
elif job_status == constants.JOB_STATUS_WAITLOCK:
# The worker will notice the new status and cancel the job
- try:
- job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
- finally:
- self.UpdateJobUnlocked(job)
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
return (True, "Job %s will be canceled" % job.id)
@_RequireOpenQueue
- def CancelJobUnlocked(self, job):
- """Marks a job as canceled.
-
- """
- try:
- job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
- "Job canceled by request")
- finally:
- self.UpdateJobUnlocked(job)
-
- @_RequireOpenQueue
def _ArchiveJobsUnlocked(self, jobs):
"""Archives jobs.