Revision b80cc518
b/lib/jqueue.py | ||
---|---|---|
745 | 745 |
return errors.EncodeException(to_encode) |
746 | 746 |
|
747 | 747 |
|
748 |
class _OpExecContext: |
|
749 |
def __init__(self, op, index, log_prefix): |
|
750 |
"""Initializes this class. |
|
751 |
|
|
752 |
""" |
|
753 |
self.op = op |
|
754 |
self.index = index |
|
755 |
self.log_prefix = log_prefix |
|
756 |
self.summary = op.input.Summary() |
|
757 |
|
|
758 |
|
|
748 | 759 |
class _JobProcessor(object): |
749 | 760 |
def __init__(self, queue, opexec_fn, job): |
750 | 761 |
"""Initializes this class. |
... | ... | |
780 | 791 |
# Found an opcode already marked as running |
781 | 792 |
raise errors.ProgrammerError("Called for job marked as running") |
782 | 793 |
|
783 |
log_prefix = "Op %s/%s" % (idx + 1, len(job.ops)) |
|
784 |
summary = op.input.Summary() |
|
794 |
opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops))) |
|
785 | 795 |
|
786 | 796 |
if op.status == constants.OP_STATUS_CANCELED: |
787 | 797 |
# Cancelled jobs are handled by the caller |
... | ... | |
794 | 804 |
# completed successfully (if any did error out, then the whole job |
795 | 805 |
# should have been aborted and not resubmitted for processing). |
796 | 806 |
logging.info("%s: opcode %s already processed, skipping", |
797 |
log_prefix, summary)
|
|
807 |
opctx.log_prefix, opctx.summary)
|
|
798 | 808 |
continue |
799 | 809 |
|
800 |
return (idx, op, log_prefix, summary)
|
|
810 |
return opctx
|
|
801 | 811 |
|
802 | 812 |
@staticmethod |
803 | 813 |
def _MarkWaitlock(job, op): |
... | ... | |
820 | 830 |
if job.start_timestamp is None: |
821 | 831 |
job.start_timestamp = op.start_timestamp |
822 | 832 |
|
823 |
def _ExecOpCodeUnlocked(self, log_prefix, op, summary):
|
|
833 |
def _ExecOpCodeUnlocked(self, opctx):
|
|
824 | 834 |
"""Processes one opcode and returns the result. |
825 | 835 |
|
826 | 836 |
""" |
837 |
op = opctx.op |
|
838 |
|
|
827 | 839 |
assert op.status == constants.OP_STATUS_WAITLOCK |
828 | 840 |
|
829 | 841 |
try: |
... | ... | |
831 | 843 |
result = self.opexec_fn(op.input, |
832 | 844 |
_OpExecCallbacks(self.queue, self.job, op)) |
833 | 845 |
except CancelJob: |
834 |
logging.exception("%s: Canceling job", log_prefix) |
|
846 |
logging.exception("%s: Canceling job", opctx.log_prefix)
|
|
835 | 847 |
assert op.status == constants.OP_STATUS_CANCELING |
836 | 848 |
return (constants.OP_STATUS_CANCELING, None) |
837 | 849 |
except Exception, err: # pylint: disable-msg=W0703 |
838 |
logging.exception("%s: Caught exception in %s", log_prefix, summary) |
|
850 |
logging.exception("%s: Caught exception in %s", |
|
851 |
opctx.log_prefix, opctx.summary) |
|
839 | 852 |
return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) |
840 | 853 |
else: |
841 |
logging.debug("%s: %s successful", log_prefix, summary) |
|
854 |
logging.debug("%s: %s successful", |
|
855 |
opctx.log_prefix, opctx.summary) |
|
842 | 856 |
return (constants.OP_STATUS_SUCCESS, result) |
843 | 857 |
|
844 | 858 |
def __call__(self): |
... | ... | |
858 | 872 |
try: |
859 | 873 |
opcount = len(job.ops) |
860 | 874 |
|
861 |
(opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job) |
|
875 |
opctx = self._FindNextOpcode(job) |
|
876 |
op = opctx.op |
|
862 | 877 |
|
863 | 878 |
# Consistency check |
864 | 879 |
assert compat.all(i.status in (constants.OP_STATUS_QUEUED, |
865 | 880 |
constants.OP_STATUS_CANCELED) |
866 |
for i in job.ops[opidx:])
|
|
881 |
for i in job.ops[opctx.index:])
|
|
867 | 882 |
|
868 | 883 |
assert op.status in (constants.OP_STATUS_QUEUED, |
869 | 884 |
constants.OP_STATUS_WAITLOCK, |
... | ... | |
879 | 894 |
# Write to disk |
880 | 895 |
queue.UpdateJobUnlocked(job) |
881 | 896 |
|
882 |
logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary) |
|
897 |
logging.info("%s: opcode %s waiting for locks", |
|
898 |
opctx.log_prefix, opctx.summary) |
|
883 | 899 |
|
884 | 900 |
queue.release() |
885 | 901 |
try: |
886 |
(op_status, op_result) = \ |
|
887 |
self._ExecOpCodeUnlocked(log_prefix, op, op_summary) |
|
902 |
(op_status, op_result) = self._ExecOpCodeUnlocked(opctx) |
|
888 | 903 |
finally: |
889 | 904 |
queue.acquire(shared=1) |
890 | 905 |
|
... | ... | |
895 | 910 |
|
896 | 911 |
if op.status == constants.OP_STATUS_CANCELING: |
897 | 912 |
assert not compat.any(i.status != constants.OP_STATUS_CANCELING |
898 |
for i in job.ops[opidx:])
|
|
913 |
for i in job.ops[opctx.index:])
|
|
899 | 914 |
else: |
900 | 915 |
assert op.status in constants.OPS_FINALIZED |
901 | 916 |
|
902 | 917 |
# Ensure all opcodes so far have been successful |
903 |
assert (opidx == 0 or
|
|
918 |
assert (opctx.index == 0 or
|
|
904 | 919 |
compat.all(i.status == constants.OP_STATUS_SUCCESS |
905 |
for i in job.ops[:opidx]))
|
|
920 |
for i in job.ops[:opctx.index]))
|
|
906 | 921 |
|
907 | 922 |
if op.status == constants.OP_STATUS_SUCCESS: |
908 | 923 |
finalize = False |
909 | 924 |
|
910 | 925 |
elif op.status == constants.OP_STATUS_ERROR: |
911 | 926 |
# Ensure failed opcode has an exception as its result |
912 |
assert errors.GetEncodedError(job.ops[opidx].result)
|
|
927 |
assert errors.GetEncodedError(job.ops[opctx.index].result)
|
|
913 | 928 |
|
914 | 929 |
to_encode = errors.OpExecError("Preceding opcode failed") |
915 | 930 |
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, |
... | ... | |
919 | 934 |
# Consistency check |
920 | 935 |
assert compat.all(i.status == constants.OP_STATUS_ERROR and |
921 | 936 |
errors.GetEncodedError(i.result) |
922 |
for i in job.ops[opidx:])
|
|
937 |
for i in job.ops[opctx.index:])
|
|
923 | 938 |
|
924 | 939 |
elif op.status == constants.OP_STATUS_CANCELING: |
925 | 940 |
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, |
... | ... | |
933 | 948 |
raise errors.ProgrammerError("Unknown status '%s'" % op.status) |
934 | 949 |
|
935 | 950 |
# Finalizing or last opcode? |
936 |
if finalize or opidx == (opcount - 1):
|
|
951 |
if finalize or opctx.index == (opcount - 1):
|
|
937 | 952 |
# All opcodes have been run, finalize job |
938 | 953 |
job.end_timestamp = TimeStampNow() |
939 | 954 |
|
... | ... | |
941 | 956 |
# allowed. Once the file has been written, it can be archived anytime. |
942 | 957 |
queue.UpdateJobUnlocked(job) |
943 | 958 |
|
944 |
if finalize or opidx == (opcount - 1):
|
|
959 |
if finalize or opctx.index == (opcount - 1):
|
|
945 | 960 |
logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) |
946 | 961 |
return True |
947 | 962 |
|
Also available in: Unified diff