@ivar stop_timestamp: timestamp for the end of the execution
"""
+ __slots__ = ["input", "status", "result", "log",
+ "start_timestamp", "end_timestamp",
+ "__weakref__"]
+
def __init__(self, op):
"""Constructor for the _QuededOpCode.
@ivar change: a Condition variable we use for waiting for job changes
"""
+ __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
+ "received_timestamp", "start_timestamp", "end_timestamp",
+ "change",
+ "__weakref__"]
+
def __init__(self, queue, job_id, ops):
"""Constructor for the _QueuedJob.
return entries
+ def MarkUnfinishedOps(self, status, result):
+ """Mark unfinished opcodes with a given status and result.
+
+ This is an utility function for marking all running or waiting to
+ be run opcodes with a given status. Opcodes which are already
+ finalised are not changed.
+
+ @param status: a given opcode status
+ @param result: the opcode result
+
+ """
+ not_marked = True
+ for op in self.ops:
+ if op.status in constants.OPS_FINALIZED:
+ assert not_marked, "Finalized opcodes found after non-finalized ones"
+ continue
+ op.status = status
+ op.result = result
+ not_marked = False
+
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
count = len(job.ops)
for idx, op in enumerate(job.ops):
op_summary = op.input.Summary()
+ if op.status == constants.OP_STATUS_SUCCESS:
+ # this is a job that was partially completed before master
+ # daemon shutdown, so it can be expected that some opcodes
+ # are already completed successfully (if any did error
+ # out, then the whole job should have been aborted and not
+ # resubmitted for processing)
+ logging.info("Op %s/%s: opcode %s already processed, skipping",
+ idx + 1, count, op_summary)
+ continue
try:
logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
op_summary)
queue.acquire()
try:
try:
- job.run_op_idx = -1
+ job.run_op_index = -1
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job)
finally:
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
try:
- for op in job.ops:
- op.status = constants.OP_STATUS_ERROR
- op.result = "Unclean master daemon shutdown"
+ job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+ "Unclean master daemon shutdown")
finally:
self.UpdateJobUnlocked(job)
elif job_status == constants.JOB_STATUS_WAITLOCK:
# The worker will notice the new status and cancel the job
try:
- for op in job.ops:
- op.status = constants.OP_STATUS_CANCELING
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
finally:
self.UpdateJobUnlocked(job)
return (True, "Job %s will be canceled" % job.id)
"""
try:
- for op in job.ops:
- op.status = constants.OP_STATUS_CANCELED
- op.result = "Job canceled by request"
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+ "Job canceled by request")
finally:
self.UpdateJobUnlocked(job)