self.ops = [_QueuedOpCode(op) for op in ops]
self.run_op_index = -1
self.log_serial = 0
+ self.received_timestamp = TimeStampNow()
+ self.start_timestamp = None
+ self.end_timestamp = None
# Condition to wait for changes
self.change = threading.Condition(self.queue._lock)
obj.queue = queue
obj.id = state["id"]
obj.run_op_index = state["run_op_index"]
+ obj.received_timestamp = state.get("received_timestamp", None)
+ obj.start_timestamp = state.get("start_timestamp", None)
+ obj.end_timestamp = state.get("end_timestamp", None)
obj.ops = []
obj.log_serial = 0
"id": self.id,
"ops": [op.Serialize() for op in self.ops],
"run_op_index": self.run_op_index,
+ "start_timestamp": self.start_timestamp,
+ "end_timestamp": self.end_timestamp,
+ "received_timestamp": self.received_timestamp,
}
def CalcStatus(self):
op.status = constants.OP_STATUS_RUNNING
op.result = None
op.start_timestamp = TimeStampNow()
+ if idx == 0: # first opcode
+ job.start_timestamp = op.start_timestamp
queue.UpdateJobUnlocked(job)
input_opcode = op.input
try:
try:
job.run_op_idx = -1
+ job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job)
finally:
job_id = job.id
row.append([op.status for op in job.ops])
elif fname == "oplog":
row.append([op.log for op in job.ops])
+ elif fname == "opstart":
+ row.append([op.start_timestamp for op in job.ops])
+ elif fname == "opend":
+ row.append([op.end_timestamp for op in job.ops])
+ elif fname == "received_ts":
+ row.append(job.received_timestamp)
+ elif fname == "start_ts":
+ row.append(job.start_timestamp)
+ elif fname == "end_ts":
+ row.append(job.end_timestamp)
elif fname == "summary":
row.append([op.input.Summary() for op in job.ops])
else: