Revision 5fd6b694 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
895 | 895 |
|
896 | 896 |
""" |
897 | 897 |
assert op in job.ops |
898 |
assert op.status in (constants.OP_STATUS_QUEUED, |
|
899 |
constants.OP_STATUS_WAITLOCK) |
|
900 |
|
|
901 |
update = False |
|
898 | 902 |
|
899 |
op.status = constants.OP_STATUS_WAITLOCK |
|
900 | 903 |
op.result = None |
901 |
op.start_timestamp = TimeStampNow() |
|
904 |
|
|
905 |
if op.status == constants.OP_STATUS_QUEUED: |
|
906 |
op.status = constants.OP_STATUS_WAITLOCK |
|
907 |
update = True |
|
908 |
|
|
909 |
if op.start_timestamp is None: |
|
910 |
op.start_timestamp = TimeStampNow() |
|
911 |
update = True |
|
902 | 912 |
|
903 | 913 |
if job.start_timestamp is None: |
904 | 914 |
job.start_timestamp = op.start_timestamp |
915 |
update = True |
|
916 |
|
|
917 |
assert op.status == constants.OP_STATUS_WAITLOCK |
|
918 |
|
|
919 |
return update |
|
905 | 920 |
|
906 | 921 |
def _ExecOpCodeUnlocked(self, opctx): |
907 | 922 |
"""Processes one opcode and returns the result. |
... | ... | |
929 | 944 |
if op.status == constants.OP_STATUS_CANCELING: |
930 | 945 |
return (constants.OP_STATUS_CANCELING, None) |
931 | 946 |
|
932 |
return (constants.OP_STATUS_QUEUED, None) |
|
947 |
# Stay in waitlock while trying to re-acquire lock |
|
948 |
return (constants.OP_STATUS_WAITLOCK, None) |
|
933 | 949 |
except CancelJob: |
934 | 950 |
logging.exception("%s: Canceling job", opctx.log_prefix) |
935 | 951 |
assert op.status == constants.OP_STATUS_CANCELING |
... | ... | |
964 | 980 |
# Is a previous opcode still pending? |
965 | 981 |
if job.cur_opctx: |
966 | 982 |
opctx = job.cur_opctx |
983 |
job.cur_opctx = None |
|
967 | 984 |
else: |
968 | 985 |
if __debug__ and _nextop_fn: |
969 | 986 |
_nextop_fn() |
... | ... | |
974 | 991 |
# Consistency check |
975 | 992 |
assert compat.all(i.status in (constants.OP_STATUS_QUEUED, |
976 | 993 |
constants.OP_STATUS_CANCELED) |
977 |
for i in job.ops[opctx.index:]) |
|
994 |
for i in job.ops[opctx.index + 1:])
|
|
978 | 995 |
|
979 | 996 |
assert op.status in (constants.OP_STATUS_QUEUED, |
980 | 997 |
constants.OP_STATUS_WAITLOCK, |
... | ... | |
985 | 1002 |
|
986 | 1003 |
if op.status != constants.OP_STATUS_CANCELED: |
987 | 1004 |
# Prepare to start opcode |
988 |
self._MarkWaitlock(job, op) |
|
1005 |
if self._MarkWaitlock(job, op): |
|
1006 |
# Write to disk |
|
1007 |
queue.UpdateJobUnlocked(job) |
|
989 | 1008 |
|
990 | 1009 |
assert op.status == constants.OP_STATUS_WAITLOCK |
991 | 1010 |
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK |
992 |
|
|
993 |
# Write to disk |
|
994 |
queue.UpdateJobUnlocked(job) |
|
1011 |
assert job.start_timestamp and op.start_timestamp |
|
995 | 1012 |
|
996 | 1013 |
logging.info("%s: opcode %s waiting for locks", |
997 | 1014 |
opctx.log_prefix, opctx.summary) |
... | ... | |
1005 | 1022 |
op.status = op_status |
1006 | 1023 |
op.result = op_result |
1007 | 1024 |
|
1008 |
if op.status == constants.OP_STATUS_QUEUED:
|
|
1025 |
if op.status == constants.OP_STATUS_WAITLOCK:
|
|
1009 | 1026 |
# Couldn't get locks in time |
1010 | 1027 |
assert not op.end_timestamp |
1011 | 1028 |
else: |
... | ... | |
1018 | 1035 |
else: |
1019 | 1036 |
assert op.status in constants.OPS_FINALIZED |
1020 | 1037 |
|
1021 |
if op.status == constants.OP_STATUS_QUEUED:
|
|
1038 |
if op.status == constants.OP_STATUS_WAITLOCK:
|
|
1022 | 1039 |
finalize = False |
1023 | 1040 |
|
1024 |
opctx.CheckPriorityIncrease() |
|
1041 |
if opctx.CheckPriorityIncrease(): |
|
1042 |
# Priority was changed, need to update on-disk file |
|
1043 |
queue.UpdateJobUnlocked(job) |
|
1025 | 1044 |
|
1026 | 1045 |
# Keep around for another round |
1027 | 1046 |
job.cur_opctx = opctx |
... | ... | |
1030 | 1049 |
op.priority >= constants.OP_PRIO_HIGHEST) |
1031 | 1050 |
|
1032 | 1051 |
# In no case must the status be finalized here |
1033 |
assert job.CalcStatus() == constants.JOB_STATUS_QUEUED |
|
1034 |
|
|
1035 |
queue.UpdateJobUnlocked(job) |
|
1052 |
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK |
|
1036 | 1053 |
|
1037 | 1054 |
else: |
1038 | 1055 |
# Ensure all opcodes so far have been successful |
Also available in: Unified diff