Revision b80cc518

b/lib/jqueue.py
745 745
  return errors.EncodeException(to_encode)
746 746

  
747 747

  
748
class _OpExecContext:
749
  def __init__(self, op, index, log_prefix):
750
    """Initializes this class.
751

  
752
    """
753
    self.op = op
754
    self.index = index
755
    self.log_prefix = log_prefix
756
    self.summary = op.input.Summary()
757

  
758

  
748 759
class _JobProcessor(object):
749 760
  def __init__(self, queue, opexec_fn, job):
750 761
    """Initializes this class.
......
780 791
        # Found an opcode already marked as running
781 792
        raise errors.ProgrammerError("Called for job marked as running")
782 793

  
783
      log_prefix = "Op %s/%s" % (idx + 1, len(job.ops))
784
      summary = op.input.Summary()
794
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)))
785 795

  
786 796
      if op.status == constants.OP_STATUS_CANCELED:
787 797
        # Cancelled jobs are handled by the caller
......
794 804
        # completed successfully (if any did error out, then the whole job
795 805
        # should have been aborted and not resubmitted for processing).
796 806
        logging.info("%s: opcode %s already processed, skipping",
797
                     log_prefix, summary)
807
                     opctx.log_prefix, opctx.summary)
798 808
        continue
799 809

  
800
      return (idx, op, log_prefix, summary)
810
      return opctx
801 811

  
802 812
  @staticmethod
803 813
  def _MarkWaitlock(job, op):
......
820 830
    if job.start_timestamp is None:
821 831
      job.start_timestamp = op.start_timestamp
822 832

  
823
  def _ExecOpCodeUnlocked(self, log_prefix, op, summary):
833
  def _ExecOpCodeUnlocked(self, opctx):
824 834
    """Processes one opcode and returns the result.
825 835

  
826 836
    """
837
    op = opctx.op
838

  
827 839
    assert op.status == constants.OP_STATUS_WAITLOCK
828 840

  
829 841
    try:
......
831 843
      result = self.opexec_fn(op.input,
832 844
                              _OpExecCallbacks(self.queue, self.job, op))
833 845
    except CancelJob:
834
      logging.exception("%s: Canceling job", log_prefix)
846
      logging.exception("%s: Canceling job", opctx.log_prefix)
835 847
      assert op.status == constants.OP_STATUS_CANCELING
836 848
      return (constants.OP_STATUS_CANCELING, None)
837 849
    except Exception, err: # pylint: disable-msg=W0703
838
      logging.exception("%s: Caught exception in %s", log_prefix, summary)
850
      logging.exception("%s: Caught exception in %s",
851
                        opctx.log_prefix, opctx.summary)
839 852
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
840 853
    else:
841
      logging.debug("%s: %s successful", log_prefix, summary)
854
      logging.debug("%s: %s successful",
855
                    opctx.log_prefix, opctx.summary)
842 856
      return (constants.OP_STATUS_SUCCESS, result)
843 857

  
844 858
  def __call__(self):
......
858 872
    try:
859 873
      opcount = len(job.ops)
860 874

  
861
      (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job)
875
      opctx = self._FindNextOpcode(job)
876
      op = opctx.op
862 877

  
863 878
      # Consistency check
864 879
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
865 880
                                     constants.OP_STATUS_CANCELED)
866
                        for i in job.ops[opidx:])
881
                        for i in job.ops[opctx.index:])
867 882

  
868 883
      assert op.status in (constants.OP_STATUS_QUEUED,
869 884
                           constants.OP_STATUS_WAITLOCK,
......
879 894
        # Write to disk
880 895
        queue.UpdateJobUnlocked(job)
881 896

  
882
        logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary)
897
        logging.info("%s: opcode %s waiting for locks",
898
                     opctx.log_prefix, opctx.summary)
883 899

  
884 900
        queue.release()
885 901
        try:
886
          (op_status, op_result) = \
887
            self._ExecOpCodeUnlocked(log_prefix, op, op_summary)
902
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
888 903
        finally:
889 904
          queue.acquire(shared=1)
890 905

  
......
895 910

  
896 911
        if op.status == constants.OP_STATUS_CANCELING:
897 912
          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
898
                                for i in job.ops[opidx:])
913
                                for i in job.ops[opctx.index:])
899 914
        else:
900 915
          assert op.status in constants.OPS_FINALIZED
901 916

  
902 917
      # Ensure all opcodes so far have been successful
903
      assert (opidx == 0 or
918
      assert (opctx.index == 0 or
904 919
              compat.all(i.status == constants.OP_STATUS_SUCCESS
905
                         for i in job.ops[:opidx]))
920
                         for i in job.ops[:opctx.index]))
906 921

  
907 922
      if op.status == constants.OP_STATUS_SUCCESS:
908 923
        finalize = False
909 924

  
910 925
      elif op.status == constants.OP_STATUS_ERROR:
911 926
        # Ensure failed opcode has an exception as its result
912
        assert errors.GetEncodedError(job.ops[opidx].result)
927
        assert errors.GetEncodedError(job.ops[opctx.index].result)
913 928

  
914 929
        to_encode = errors.OpExecError("Preceding opcode failed")
915 930
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
......
919 934
        # Consistency check
920 935
        assert compat.all(i.status == constants.OP_STATUS_ERROR and
921 936
                          errors.GetEncodedError(i.result)
922
                          for i in job.ops[opidx:])
937
                          for i in job.ops[opctx.index:])
923 938

  
924 939
      elif op.status == constants.OP_STATUS_CANCELING:
925 940
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
......
933 948
        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
934 949

  
935 950
      # Finalizing or last opcode?
936
      if finalize or opidx == (opcount - 1):
951
      if finalize or opctx.index == (opcount - 1):
937 952
        # All opcodes have been run, finalize job
938 953
        job.end_timestamp = TimeStampNow()
939 954

  
......
941 956
      # allowed. Once the file has been written, it can be archived anytime.
942 957
      queue.UpdateJobUnlocked(job)
943 958

  
944
      if finalize or opidx == (opcount - 1):
959
      if finalize or opctx.index == (opcount - 1):
945 960
        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
946 961
        return True
947 962

  

Also available in: Unified diff