Revision 26d3fd2f lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
176 | 176 |
|
177 | 177 |
""" |
178 | 178 |
# pylint: disable-msg=W0212 |
179 |
__slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", |
|
179 |
__slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
|
|
180 | 180 |
"received_timestamp", "start_timestamp", "end_timestamp", |
181 | 181 |
"__weakref__"] |
182 | 182 |
|
... | ... | |
211 | 211 |
|
212 | 212 |
""" |
213 | 213 |
obj.ops_iter = None |
214 |
obj.cur_opctx = None |
|
214 | 215 |
|
215 | 216 |
def __repr__(self): |
216 | 217 |
status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), |
... | ... | |
745 | 746 |
return errors.EncodeException(to_encode) |
746 | 747 |
|
747 | 748 |
|
749 |
class _TimeoutStrategyWrapper: |
|
750 |
def __init__(self, fn): |
|
751 |
"""Initializes this class. |
|
752 |
|
|
753 |
""" |
|
754 |
self._fn = fn |
|
755 |
self._next = None |
|
756 |
|
|
757 |
def _Advance(self): |
|
758 |
"""Gets the next timeout if necessary. |
|
759 |
|
|
760 |
""" |
|
761 |
if self._next is None: |
|
762 |
self._next = self._fn() |
|
763 |
|
|
764 |
def Peek(self): |
|
765 |
"""Returns the next timeout. |
|
766 |
|
|
767 |
""" |
|
768 |
self._Advance() |
|
769 |
return self._next |
|
770 |
|
|
771 |
def Next(self): |
|
772 |
"""Returns the current timeout and advances the internal state. |
|
773 |
|
|
774 |
""" |
|
775 |
self._Advance() |
|
776 |
result = self._next |
|
777 |
self._next = None |
|
778 |
return result |
|
779 |
|
|
780 |
|
|
748 | 781 |
class _OpExecContext: |
749 |
def __init__(self, op, index, log_prefix): |
|
782 |
def __init__(self, op, index, log_prefix, timeout_strategy_factory):
|
|
750 | 783 |
"""Initializes this class. |
751 | 784 |
|
752 | 785 |
""" |
... | ... | |
755 | 788 |
self.log_prefix = log_prefix |
756 | 789 |
self.summary = op.input.Summary() |
757 | 790 |
|
791 |
self._timeout_strategy_factory = timeout_strategy_factory |
|
792 |
self._ResetTimeoutStrategy() |
|
793 |
|
|
794 |
def _ResetTimeoutStrategy(self): |
|
795 |
"""Creates a new timeout strategy. |
|
796 |
|
|
797 |
""" |
|
798 |
self._timeout_strategy = \ |
|
799 |
_TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt) |
|
800 |
|
|
801 |
def CheckPriorityIncrease(self): |
|
802 |
"""Checks whether priority can and should be increased. |
|
803 |
|
|
804 |
Called when locks couldn't be acquired. |
|
805 |
|
|
806 |
""" |
|
807 |
op = self.op |
|
808 |
|
|
809 |
# Exhausted all retries and next round should not use blocking acquire |
|
810 |
# for locks? |
|
811 |
if (self._timeout_strategy.Peek() is None and |
|
812 |
op.priority > constants.OP_PRIO_HIGHEST): |
|
813 |
logging.debug("Increasing priority") |
|
814 |
op.priority -= 1 |
|
815 |
self._ResetTimeoutStrategy() |
|
816 |
return True |
|
817 |
|
|
818 |
return False |
|
819 |
|
|
820 |
def GetNextLockTimeout(self): |
|
821 |
"""Returns the next lock acquire timeout. |
|
822 |
|
|
823 |
""" |
|
824 |
return self._timeout_strategy.Next() |
|
825 |
|
|
758 | 826 |
|
759 | 827 |
class _JobProcessor(object): |
760 |
def __init__(self, queue, opexec_fn, job): |
|
828 |
def __init__(self, queue, opexec_fn, job, |
|
829 |
_timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy): |
|
761 | 830 |
"""Initializes this class. |
762 | 831 |
|
763 | 832 |
""" |
764 | 833 |
self.queue = queue |
765 | 834 |
self.opexec_fn = opexec_fn |
766 | 835 |
self.job = job |
836 |
self._timeout_strategy_factory = _timeout_strategy_factory |
|
767 | 837 |
|
768 | 838 |
@staticmethod |
769 |
def _FindNextOpcode(job): |
|
839 |
def _FindNextOpcode(job, timeout_strategy_factory):
|
|
770 | 840 |
"""Locates the next opcode to run. |
771 | 841 |
|
772 | 842 |
@type job: L{_QueuedJob} |
773 | 843 |
@param job: Job object |
844 |
@param timeout_strategy_factory: Callable to create new timeout strategy |
|
774 | 845 |
|
775 | 846 |
""" |
776 | 847 |
# Create some sort of a cache to speed up locating next opcode for future |
... | ... | |
791 | 862 |
# Found an opcode already marked as running |
792 | 863 |
raise errors.ProgrammerError("Called for job marked as running") |
793 | 864 |
|
794 |
opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops))) |
|
865 |
opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), |
|
866 |
timeout_strategy_factory) |
|
795 | 867 |
|
796 | 868 |
if op.status == constants.OP_STATUS_CANCELED: |
797 | 869 |
# Cancelled jobs are handled by the caller |
... | ... | |
838 | 910 |
|
839 | 911 |
assert op.status == constants.OP_STATUS_WAITLOCK |
840 | 912 |
|
913 |
timeout = opctx.GetNextLockTimeout() |
|
914 |
|
|
841 | 915 |
try: |
842 | 916 |
# Make sure not to hold queue lock while calling ExecOpCode |
843 | 917 |
result = self.opexec_fn(op.input, |
844 |
_OpExecCallbacks(self.queue, self.job, op)) |
|
918 |
_OpExecCallbacks(self.queue, self.job, op), |
|
919 |
timeout=timeout) |
|
920 |
except mcpu.LockAcquireTimeout: |
|
921 |
assert timeout is not None, "Received timeout for blocking acquire" |
|
922 |
logging.debug("Couldn't acquire locks in %0.6fs", timeout) |
|
923 |
assert op.status == constants.OP_STATUS_WAITLOCK |
|
924 |
return (constants.OP_STATUS_QUEUED, None) |
|
845 | 925 |
except CancelJob: |
846 | 926 |
logging.exception("%s: Canceling job", opctx.log_prefix) |
847 | 927 |
assert op.status == constants.OP_STATUS_CANCELING |
... | ... | |
855 | 935 |
opctx.log_prefix, opctx.summary) |
856 | 936 |
return (constants.OP_STATUS_SUCCESS, result) |
857 | 937 |
|
858 |
def __call__(self): |
|
938 |
def __call__(self, _nextop_fn=None):
|
|
859 | 939 |
"""Continues execution of a job. |
860 | 940 |
|
941 |
@param _nextop_fn: Callback function for tests |
|
861 | 942 |
@rtype: bool |
862 | 943 |
@return: True if job is finished, False if processor needs to be called |
863 | 944 |
again |
... | ... | |
872 | 953 |
try: |
873 | 954 |
opcount = len(job.ops) |
874 | 955 |
|
875 |
opctx = self._FindNextOpcode(job) |
|
956 |
# Is a previous opcode still pending? |
|
957 |
if job.cur_opctx: |
|
958 |
opctx = job.cur_opctx |
|
959 |
else: |
|
960 |
if __debug__ and _nextop_fn: |
|
961 |
_nextop_fn() |
|
962 |
opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) |
|
963 |
|
|
876 | 964 |
op = opctx.op |
877 | 965 |
|
878 | 966 |
# Consistency check |
... | ... | |
884 | 972 |
constants.OP_STATUS_WAITLOCK, |
885 | 973 |
constants.OP_STATUS_CANCELED) |
886 | 974 |
|
975 |
assert (op.priority <= constants.OP_PRIO_LOWEST and |
|
976 |
op.priority >= constants.OP_PRIO_HIGHEST) |
|
977 |
|
|
887 | 978 |
if op.status != constants.OP_STATUS_CANCELED: |
888 | 979 |
# Prepare to start opcode |
889 | 980 |
self._MarkWaitlock(job, op) |
... | ... | |
903 | 994 |
finally: |
904 | 995 |
queue.acquire(shared=1) |
905 | 996 |
|
906 |
# Finalize opcode |
|
907 |
op.end_timestamp = TimeStampNow() |
|
908 | 997 |
op.status = op_status |
909 | 998 |
op.result = op_result |
910 | 999 |
|
911 |
if op.status == constants.OP_STATUS_CANCELING:
|
|
912 |
assert not compat.any(i.status != constants.OP_STATUS_CANCELING
|
|
913 |
for i in job.ops[opctx.index:])
|
|
1000 |
if op.status == constants.OP_STATUS_QUEUED:
|
|
1001 |
# Couldn't get locks in time
|
|
1002 |
assert not op.end_timestamp
|
|
914 | 1003 |
else: |
915 |
assert op.status in constants.OPS_FINALIZED |
|
1004 |
# Finalize opcode |
|
1005 |
op.end_timestamp = TimeStampNow() |
|
916 | 1006 |
|
917 |
# Ensure all opcodes so far have been successful |
|
918 |
assert (opctx.index == 0 or |
|
919 |
compat.all(i.status == constants.OP_STATUS_SUCCESS |
|
920 |
for i in job.ops[:opctx.index])) |
|
1007 |
if op.status == constants.OP_STATUS_CANCELING: |
|
1008 |
assert not compat.any(i.status != constants.OP_STATUS_CANCELING |
|
1009 |
for i in job.ops[opctx.index:]) |
|
1010 |
else: |
|
1011 |
assert op.status in constants.OPS_FINALIZED |
|
921 | 1012 |
|
922 |
if op.status == constants.OP_STATUS_SUCCESS:
|
|
1013 |
if op.status == constants.OP_STATUS_QUEUED:
|
|
923 | 1014 |
finalize = False |
924 | 1015 |
|
925 |
elif op.status == constants.OP_STATUS_ERROR: |
|
926 |
# Ensure failed opcode has an exception as its result |
|
927 |
assert errors.GetEncodedError(job.ops[opctx.index].result) |
|
1016 |
opctx.CheckPriorityIncrease() |
|
928 | 1017 |
|
929 |
to_encode = errors.OpExecError("Preceding opcode failed") |
|
930 |
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, |
|
931 |
_EncodeOpError(to_encode)) |
|
932 |
finalize = True |
|
1018 |
# Keep around for another round |
|
1019 |
job.cur_opctx = opctx |
|
933 | 1020 |
|
934 |
# Consistency check |
|
935 |
assert compat.all(i.status == constants.OP_STATUS_ERROR and |
|
936 |
errors.GetEncodedError(i.result) |
|
937 |
for i in job.ops[opctx.index:]) |
|
1021 |
assert (op.priority <= constants.OP_PRIO_LOWEST and |
|
1022 |
op.priority >= constants.OP_PRIO_HIGHEST) |
|
938 | 1023 |
|
939 |
elif op.status == constants.OP_STATUS_CANCELING: |
|
940 |
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, |
|
941 |
"Job canceled by request") |
|
942 |
finalize = True |
|
1024 |
# In no case must the status be finalized here |
|
1025 |
assert job.CalcStatus() == constants.JOB_STATUS_QUEUED |
|
943 | 1026 |
|
944 |
elif op.status == constants.OP_STATUS_CANCELED: |
|
945 |
finalize = True |
|
1027 |
queue.UpdateJobUnlocked(job) |
|
946 | 1028 |
|
947 | 1029 |
else: |
948 |
raise errors.ProgrammerError("Unknown status '%s'" % op.status) |
|
1030 |
# Ensure all opcodes so far have been successful |
|
1031 |
assert (opctx.index == 0 or |
|
1032 |
compat.all(i.status == constants.OP_STATUS_SUCCESS |
|
1033 |
for i in job.ops[:opctx.index])) |
|
1034 |
|
|
1035 |
# Reset context |
|
1036 |
job.cur_opctx = None |
|
1037 |
|
|
1038 |
if op.status == constants.OP_STATUS_SUCCESS: |
|
1039 |
finalize = False |
|
1040 |
|
|
1041 |
elif op.status == constants.OP_STATUS_ERROR: |
|
1042 |
# Ensure failed opcode has an exception as its result |
|
1043 |
assert errors.GetEncodedError(job.ops[opctx.index].result) |
|
1044 |
|
|
1045 |
to_encode = errors.OpExecError("Preceding opcode failed") |
|
1046 |
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, |
|
1047 |
_EncodeOpError(to_encode)) |
|
1048 |
finalize = True |
|
949 | 1049 |
|
950 |
# Finalizing or last opcode?
|
|
951 |
if finalize or opctx.index == (opcount - 1):
|
|
952 |
# All opcodes have been run, finalize job
|
|
953 |
job.end_timestamp = TimeStampNow()
|
|
1050 |
# Consistency check
|
|
1051 |
assert compat.all(i.status == constants.OP_STATUS_ERROR and
|
|
1052 |
errors.GetEncodedError(i.result)
|
|
1053 |
for i in job.ops[opctx.index:])
|
|
954 | 1054 |
|
955 |
# Write to disk. If the job status is final, this is the final write |
|
956 |
# allowed. Once the file has been written, it can be archived anytime. |
|
957 |
queue.UpdateJobUnlocked(job) |
|
1055 |
elif op.status == constants.OP_STATUS_CANCELING: |
|
1056 |
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, |
|
1057 |
"Job canceled by request") |
|
1058 |
finalize = True |
|
1059 |
|
|
1060 |
elif op.status == constants.OP_STATUS_CANCELED: |
|
1061 |
finalize = True |
|
1062 |
|
|
1063 |
else: |
|
1064 |
raise errors.ProgrammerError("Unknown status '%s'" % op.status) |
|
1065 |
|
|
1066 |
# Finalizing or last opcode? |
|
1067 |
if finalize or opctx.index == (opcount - 1): |
|
1068 |
# All opcodes have been run, finalize job |
|
1069 |
job.end_timestamp = TimeStampNow() |
|
1070 |
|
|
1071 |
# Write to disk. If the job status is final, this is the final write |
|
1072 |
# allowed. Once the file has been written, it can be archived anytime. |
|
1073 |
queue.UpdateJobUnlocked(job) |
|
958 | 1074 |
|
959 |
if finalize or opctx.index == (opcount - 1): |
|
960 |
logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) |
|
961 |
return True |
|
1075 |
if finalize or opctx.index == (opcount - 1):
|
|
1076 |
logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
|
|
1077 |
return True
|
|
962 | 1078 |
|
963 | 1079 |
return False |
964 | 1080 |
finally: |
... | ... | |
988 | 1104 |
|
989 | 1105 |
if not _JobProcessor(queue, proc.ExecOpCode, job)(): |
990 | 1106 |
# Schedule again |
991 |
raise workerpool.DeferTask() |
|
1107 |
raise workerpool.DeferTask(priority=job.CalcPriority())
|
|
992 | 1108 |
|
993 | 1109 |
|
994 | 1110 |
class _JobQueueWorkerPool(workerpool.WorkerPool): |
Also available in: Unified diff