Revision 47099cd1 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
312 | 312 |
|
313 | 313 |
if op.status == constants.OP_STATUS_QUEUED: |
314 | 314 |
pass |
315 |
elif op.status == constants.OP_STATUS_WAITLOCK:
|
|
316 |
status = constants.JOB_STATUS_WAITLOCK
|
|
315 |
elif op.status == constants.OP_STATUS_WAITING:
|
|
316 |
status = constants.JOB_STATUS_WAITING
|
|
317 | 317 |
elif op.status == constants.OP_STATUS_RUNNING: |
318 | 318 |
status = constants.JOB_STATUS_RUNNING |
319 | 319 |
elif op.status == constants.OP_STATUS_CANCELING: |
... | ... | |
461 | 461 |
self.Finalize() |
462 | 462 |
return (True, "Job %s canceled" % self.id) |
463 | 463 |
|
464 |
elif status == constants.JOB_STATUS_WAITLOCK:
|
|
464 |
elif status == constants.JOB_STATUS_WAITING:
|
|
465 | 465 |
# The worker will notice the new status and cancel the job |
466 | 466 |
self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) |
467 | 467 |
return (True, "Job %s will be canceled" % self.id) |
... | ... | |
507 | 507 |
This is called from the mcpu code as a notifier function, when the LU is |
508 | 508 |
finally about to start the Exec() method. Of course, to have end-user |
509 | 509 |
visible results, the opcode must be initially (before calling into |
510 |
Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
|
|
510 |
Processor.ExecOpCode) set to OP_STATUS_WAITING.
|
|
511 | 511 |
|
512 | 512 |
""" |
513 | 513 |
assert self._op in self._job.ops |
514 |
assert self._op.status in (constants.OP_STATUS_WAITLOCK,
|
|
514 |
assert self._op.status in (constants.OP_STATUS_WAITING,
|
|
515 | 515 |
constants.OP_STATUS_CANCELING) |
516 | 516 |
|
517 | 517 |
# Cancel here if we were asked to |
... | ... | |
555 | 555 |
"""Check whether job has been cancelled. |
556 | 556 |
|
557 | 557 |
""" |
558 |
assert self._op.status in (constants.OP_STATUS_WAITLOCK,
|
|
558 |
assert self._op.status in (constants.OP_STATUS_WAITING,
|
|
559 | 559 |
constants.OP_STATUS_CANCELING) |
560 | 560 |
|
561 | 561 |
# Cancel here if we were asked to |
... | ... | |
616 | 616 |
# no changes. |
617 | 617 |
if (status not in (constants.JOB_STATUS_QUEUED, |
618 | 618 |
constants.JOB_STATUS_RUNNING, |
619 |
constants.JOB_STATUS_WAITLOCK) or
|
|
619 |
constants.JOB_STATUS_WAITING) or
|
|
620 | 620 |
job_info != self._prev_job_info or |
621 | 621 |
(log_entries and self._prev_log_serial != log_entries[0][0])): |
622 | 622 |
logging.debug("Job %s changed", job.id) |
... | ... | |
931 | 931 |
""" |
932 | 932 |
assert op in job.ops |
933 | 933 |
assert op.status in (constants.OP_STATUS_QUEUED, |
934 |
constants.OP_STATUS_WAITLOCK)
|
|
934 |
constants.OP_STATUS_WAITING)
|
|
935 | 935 |
|
936 | 936 |
update = False |
937 | 937 |
|
938 | 938 |
op.result = None |
939 | 939 |
|
940 | 940 |
if op.status == constants.OP_STATUS_QUEUED: |
941 |
op.status = constants.OP_STATUS_WAITLOCK
|
|
941 |
op.status = constants.OP_STATUS_WAITING
|
|
942 | 942 |
update = True |
943 | 943 |
|
944 | 944 |
if op.start_timestamp is None: |
... | ... | |
949 | 949 |
job.start_timestamp = op.start_timestamp |
950 | 950 |
update = True |
951 | 951 |
|
952 |
assert op.status == constants.OP_STATUS_WAITLOCK
|
|
952 |
assert op.status == constants.OP_STATUS_WAITING
|
|
953 | 953 |
|
954 | 954 |
return update |
955 | 955 |
|
... | ... | |
1015 | 1015 |
""" |
1016 | 1016 |
op = opctx.op |
1017 | 1017 |
|
1018 |
assert op.status == constants.OP_STATUS_WAITLOCK
|
|
1018 |
assert op.status == constants.OP_STATUS_WAITING
|
|
1019 | 1019 |
|
1020 | 1020 |
timeout = opctx.GetNextLockTimeout() |
1021 | 1021 |
|
... | ... | |
1028 | 1028 |
assert timeout is not None, "Received timeout for blocking acquire" |
1029 | 1029 |
logging.debug("Couldn't acquire locks in %0.6fs", timeout) |
1030 | 1030 |
|
1031 |
assert op.status in (constants.OP_STATUS_WAITLOCK,
|
|
1031 |
assert op.status in (constants.OP_STATUS_WAITING,
|
|
1032 | 1032 |
constants.OP_STATUS_CANCELING) |
1033 | 1033 |
|
1034 | 1034 |
# Was job cancelled while we were waiting for the lock? |
... | ... | |
1036 | 1036 |
return (constants.OP_STATUS_CANCELING, None) |
1037 | 1037 |
|
1038 | 1038 |
# Stay in waitlock while trying to re-acquire lock |
1039 |
return (constants.OP_STATUS_WAITLOCK, None)
|
|
1039 |
return (constants.OP_STATUS_WAITING, None)
|
|
1040 | 1040 |
except CancelJob: |
1041 | 1041 |
logging.exception("%s: Canceling job", opctx.log_prefix) |
1042 | 1042 |
assert op.status == constants.OP_STATUS_CANCELING |
... | ... | |
1091 | 1091 |
for i in job.ops[opctx.index + 1:]) |
1092 | 1092 |
|
1093 | 1093 |
assert op.status in (constants.OP_STATUS_QUEUED, |
1094 |
constants.OP_STATUS_WAITLOCK,
|
|
1094 |
constants.OP_STATUS_WAITING,
|
|
1095 | 1095 |
constants.OP_STATUS_CANCELING) |
1096 | 1096 |
|
1097 | 1097 |
assert (op.priority <= constants.OP_PRIO_LOWEST and |
... | ... | |
1101 | 1101 |
|
1102 | 1102 |
if op.status != constants.OP_STATUS_CANCELING: |
1103 | 1103 |
assert op.status in (constants.OP_STATUS_QUEUED, |
1104 |
constants.OP_STATUS_WAITLOCK)
|
|
1104 |
constants.OP_STATUS_WAITING)
|
|
1105 | 1105 |
|
1106 | 1106 |
# Prepare to start opcode |
1107 | 1107 |
if self._MarkWaitlock(job, op): |
1108 | 1108 |
# Write to disk |
1109 | 1109 |
queue.UpdateJobUnlocked(job) |
1110 | 1110 |
|
1111 |
assert op.status == constants.OP_STATUS_WAITLOCK
|
|
1112 |
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
|
|
1111 |
assert op.status == constants.OP_STATUS_WAITING
|
|
1112 |
assert job.CalcStatus() == constants.JOB_STATUS_WAITING
|
|
1113 | 1113 |
assert job.start_timestamp and op.start_timestamp |
1114 | 1114 |
assert waitjob is None |
1115 | 1115 |
|
1116 | 1116 |
# Check if waiting for a job is necessary |
1117 | 1117 |
waitjob = self._CheckDependencies(queue, job, opctx) |
1118 | 1118 |
|
1119 |
assert op.status in (constants.OP_STATUS_WAITLOCK,
|
|
1119 |
assert op.status in (constants.OP_STATUS_WAITING,
|
|
1120 | 1120 |
constants.OP_STATUS_CANCELING, |
1121 | 1121 |
constants.OP_STATUS_ERROR) |
1122 | 1122 |
|
... | ... | |
1138 | 1138 |
|
1139 | 1139 |
assert not waitjob |
1140 | 1140 |
|
1141 |
if op.status == constants.OP_STATUS_WAITLOCK:
|
|
1141 |
if op.status == constants.OP_STATUS_WAITING:
|
|
1142 | 1142 |
# Couldn't get locks in time |
1143 | 1143 |
assert not op.end_timestamp |
1144 | 1144 |
else: |
... | ... | |
1151 | 1151 |
else: |
1152 | 1152 |
assert op.status in constants.OPS_FINALIZED |
1153 | 1153 |
|
1154 |
if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
|
|
1154 |
if op.status == constants.OP_STATUS_WAITING or waitjob:
|
|
1155 | 1155 |
finalize = False |
1156 | 1156 |
|
1157 | 1157 |
if not waitjob and opctx.CheckPriorityIncrease(): |
... | ... | |
1165 | 1165 |
op.priority >= constants.OP_PRIO_HIGHEST) |
1166 | 1166 |
|
1167 | 1167 |
# In no case must the status be finalized here |
1168 |
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
|
|
1168 |
assert job.CalcStatus() == constants.JOB_STATUS_WAITING
|
|
1169 | 1169 |
|
1170 | 1170 |
else: |
1171 | 1171 |
# Ensure all opcodes so far have been successful |
... | ... | |
1572 | 1572 |
restartjobs.append(job) |
1573 | 1573 |
|
1574 | 1574 |
elif status in (constants.JOB_STATUS_RUNNING, |
1575 |
constants.JOB_STATUS_WAITLOCK,
|
|
1575 |
constants.JOB_STATUS_WAITING,
|
|
1576 | 1576 |
constants.JOB_STATUS_CANCELING): |
1577 | 1577 |
logging.warning("Unfinished job %s found: %s", job.id, job) |
1578 | 1578 |
|
1579 |
if status == constants.JOB_STATUS_WAITLOCK:
|
|
1579 |
if status == constants.JOB_STATUS_WAITING:
|
|
1580 | 1580 |
# Restart job |
1581 | 1581 |
job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) |
1582 | 1582 |
restartjobs.append(job) |
Also available in: Unified diff