JOB_STATUS_SUCCESS = "success"
JOB_STATUS_ERROR = "error"
+# OpCode status
+# not yet finalized
OP_STATUS_QUEUED = "queued"
OP_STATUS_WAITLOCK = "waiting"
OP_STATUS_CANCELING = "canceling"
OP_STATUS_RUNNING = "running"
+# finalized
OP_STATUS_CANCELED = "canceled"
OP_STATUS_SUCCESS = "success"
OP_STATUS_ERROR = "error"
+OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
+ OP_STATUS_SUCCESS,
+ OP_STATUS_ERROR])
# Execution log types
ELOG_MESSAGE = "message"
return entries
+ def MarkUnfinishedOps(self, status, result):
+ """Mark unfinished opcodes with a given status and result.
+
+ This is an utility function for marking all running or waiting to
+ be run opcodes with a given status. Opcodes which are already
+ finalised are not changed.
+
+ @param status: a given opcode status
+ @param result: the opcode result
+
+ """
+ not_marked = True
+ for op in self.ops:
+ if op.status in constants.OPS_FINALIZED:
+ assert not_marked, "Finalized opcodes found after non-finalized ones"
+ continue
+ op.status = status
+ op.result = result
+ not_marked = False
+
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
try:
- for op in job.ops:
- op.status = constants.OP_STATUS_ERROR
- op.result = "Unclean master daemon shutdown"
+ job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+ "Unclean master daemon shutdown")
finally:
self.UpdateJobUnlocked(job)
elif job_status == constants.JOB_STATUS_WAITLOCK:
# The worker will notice the new status and cancel the job
try:
- for op in job.ops:
- op.status = constants.OP_STATUS_CANCELING
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
finally:
self.UpdateJobUnlocked(job)
return (True, "Job %s will be canceled" % job.id)
"""
try:
- for op in job.ops:
- op.status = constants.OP_STATUS_CANCELED
- op.result = "Job canceled by request"
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+ "Job canceled by request")
finally:
self.UpdateJobUnlocked(job)