Revision 26d3fd2f lib/jqueue.py

b/lib/jqueue.py
176 176

  
177 177
  """
178 178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter",
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180 180
               "received_timestamp", "start_timestamp", "end_timestamp",
181 181
               "__weakref__"]
182 182

  
......
211 211

  
212 212
    """
213 213
    obj.ops_iter = None
214
    obj.cur_opctx = None
214 215

  
215 216
  def __repr__(self):
216 217
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
......
745 746
  return errors.EncodeException(to_encode)
746 747

  
747 748

  
749
class _TimeoutStrategyWrapper:
750
  def __init__(self, fn):
751
    """Initializes this class.
752

  
753
    """
754
    self._fn = fn
755
    self._next = None
756

  
757
  def _Advance(self):
758
    """Gets the next timeout if necessary.
759

  
760
    """
761
    if self._next is None:
762
      self._next = self._fn()
763

  
764
  def Peek(self):
765
    """Returns the next timeout.
766

  
767
    """
768
    self._Advance()
769
    return self._next
770

  
771
  def Next(self):
772
    """Returns the current timeout and advances the internal state.
773

  
774
    """
775
    self._Advance()
776
    result = self._next
777
    self._next = None
778
    return result
779

  
780

  
748 781
class _OpExecContext:
749
  def __init__(self, op, index, log_prefix):
782
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
750 783
    """Initializes this class.
751 784

  
752 785
    """
......
755 788
    self.log_prefix = log_prefix
756 789
    self.summary = op.input.Summary()
757 790

  
791
    self._timeout_strategy_factory = timeout_strategy_factory
792
    self._ResetTimeoutStrategy()
793

  
794
  def _ResetTimeoutStrategy(self):
795
    """Creates a new timeout strategy.
796

  
797
    """
798
    self._timeout_strategy = \
799
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
800

  
801
  def CheckPriorityIncrease(self):
802
    """Checks whether priority can and should be increased.
803

  
804
    Called when locks couldn't be acquired.
805

  
806
    """
807
    op = self.op
808

  
809
    # Exhausted all retries and next round should not use blocking acquire
810
    # for locks?
811
    if (self._timeout_strategy.Peek() is None and
812
        op.priority > constants.OP_PRIO_HIGHEST):
813
      logging.debug("Increasing priority")
814
      op.priority -= 1
815
      self._ResetTimeoutStrategy()
816
      return True
817

  
818
    return False
819

  
820
  def GetNextLockTimeout(self):
821
    """Returns the next lock acquire timeout.
822

  
823
    """
824
    return self._timeout_strategy.Next()
825

  
758 826

  
759 827
class _JobProcessor(object):
760
  def __init__(self, queue, opexec_fn, job):
828
  def __init__(self, queue, opexec_fn, job,
829
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
761 830
    """Initializes this class.
762 831

  
763 832
    """
764 833
    self.queue = queue
765 834
    self.opexec_fn = opexec_fn
766 835
    self.job = job
836
    self._timeout_strategy_factory = _timeout_strategy_factory
767 837

  
768 838
  @staticmethod
769
  def _FindNextOpcode(job):
839
  def _FindNextOpcode(job, timeout_strategy_factory):
770 840
    """Locates the next opcode to run.
771 841

  
772 842
    @type job: L{_QueuedJob}
773 843
    @param job: Job object
844
    @param timeout_strategy_factory: Callable to create new timeout strategy
774 845

  
775 846
    """
776 847
    # Create some sort of a cache to speed up locating next opcode for future
......
791 862
        # Found an opcode already marked as running
792 863
        raise errors.ProgrammerError("Called for job marked as running")
793 864

  
794
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)))
865
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
866
                             timeout_strategy_factory)
795 867

  
796 868
      if op.status == constants.OP_STATUS_CANCELED:
797 869
        # Cancelled jobs are handled by the caller
......
838 910

  
839 911
    assert op.status == constants.OP_STATUS_WAITLOCK
840 912

  
913
    timeout = opctx.GetNextLockTimeout()
914

  
841 915
    try:
842 916
      # Make sure not to hold queue lock while calling ExecOpCode
843 917
      result = self.opexec_fn(op.input,
844
                              _OpExecCallbacks(self.queue, self.job, op))
918
                              _OpExecCallbacks(self.queue, self.job, op),
919
                              timeout=timeout)
920
    except mcpu.LockAcquireTimeout:
921
      assert timeout is not None, "Received timeout for blocking acquire"
922
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
923
      assert op.status == constants.OP_STATUS_WAITLOCK
924
      return (constants.OP_STATUS_QUEUED, None)
845 925
    except CancelJob:
846 926
      logging.exception("%s: Canceling job", opctx.log_prefix)
847 927
      assert op.status == constants.OP_STATUS_CANCELING
......
855 935
                    opctx.log_prefix, opctx.summary)
856 936
      return (constants.OP_STATUS_SUCCESS, result)
857 937

  
858
  def __call__(self):
938
  def __call__(self, _nextop_fn=None):
859 939
    """Continues execution of a job.
860 940

  
941
    @param _nextop_fn: Callback function for tests
861 942
    @rtype: bool
862 943
    @return: True if job is finished, False if processor needs to be called
863 944
             again
......
872 953
    try:
873 954
      opcount = len(job.ops)
874 955

  
875
      opctx = self._FindNextOpcode(job)
956
      # Is a previous opcode still pending?
957
      if job.cur_opctx:
958
        opctx = job.cur_opctx
959
      else:
960
        if __debug__ and _nextop_fn:
961
          _nextop_fn()
962
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
963

  
876 964
      op = opctx.op
877 965

  
878 966
      # Consistency check
......
884 972
                           constants.OP_STATUS_WAITLOCK,
885 973
                           constants.OP_STATUS_CANCELED)
886 974

  
975
      assert (op.priority <= constants.OP_PRIO_LOWEST and
976
              op.priority >= constants.OP_PRIO_HIGHEST)
977

  
887 978
      if op.status != constants.OP_STATUS_CANCELED:
888 979
        # Prepare to start opcode
889 980
        self._MarkWaitlock(job, op)
......
903 994
        finally:
904 995
          queue.acquire(shared=1)
905 996

  
906
        # Finalize opcode
907
        op.end_timestamp = TimeStampNow()
908 997
        op.status = op_status
909 998
        op.result = op_result
910 999

  
911
        if op.status == constants.OP_STATUS_CANCELING:
912
          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
913
                                for i in job.ops[opctx.index:])
1000
        if op.status == constants.OP_STATUS_QUEUED:
1001
          # Couldn't get locks in time
1002
          assert not op.end_timestamp
914 1003
        else:
915
          assert op.status in constants.OPS_FINALIZED
1004
          # Finalize opcode
1005
          op.end_timestamp = TimeStampNow()
916 1006

  
917
      # Ensure all opcodes so far have been successful
918
      assert (opctx.index == 0 or
919
              compat.all(i.status == constants.OP_STATUS_SUCCESS
920
                         for i in job.ops[:opctx.index]))
1007
          if op.status == constants.OP_STATUS_CANCELING:
1008
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1009
                                  for i in job.ops[opctx.index:])
1010
          else:
1011
            assert op.status in constants.OPS_FINALIZED
921 1012

  
922
      if op.status == constants.OP_STATUS_SUCCESS:
1013
      if op.status == constants.OP_STATUS_QUEUED:
923 1014
        finalize = False
924 1015

  
925
      elif op.status == constants.OP_STATUS_ERROR:
926
        # Ensure failed opcode has an exception as its result
927
        assert errors.GetEncodedError(job.ops[opctx.index].result)
1016
        opctx.CheckPriorityIncrease()
928 1017

  
929
        to_encode = errors.OpExecError("Preceding opcode failed")
930
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
931
                              _EncodeOpError(to_encode))
932
        finalize = True
1018
        # Keep around for another round
1019
        job.cur_opctx = opctx
933 1020

  
934
        # Consistency check
935
        assert compat.all(i.status == constants.OP_STATUS_ERROR and
936
                          errors.GetEncodedError(i.result)
937
                          for i in job.ops[opctx.index:])
1021
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1022
                op.priority >= constants.OP_PRIO_HIGHEST)
938 1023

  
939
      elif op.status == constants.OP_STATUS_CANCELING:
940
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
941
                              "Job canceled by request")
942
        finalize = True
1024
        # In no case must the status be finalized here
1025
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
943 1026

  
944
      elif op.status == constants.OP_STATUS_CANCELED:
945
        finalize = True
1027
        queue.UpdateJobUnlocked(job)
946 1028

  
947 1029
      else:
948
        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1030
        # Ensure all opcodes so far have been successful
1031
        assert (opctx.index == 0 or
1032
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1033
                           for i in job.ops[:opctx.index]))
1034

  
1035
        # Reset context
1036
        job.cur_opctx = None
1037

  
1038
        if op.status == constants.OP_STATUS_SUCCESS:
1039
          finalize = False
1040

  
1041
        elif op.status == constants.OP_STATUS_ERROR:
1042
          # Ensure failed opcode has an exception as its result
1043
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1044

  
1045
          to_encode = errors.OpExecError("Preceding opcode failed")
1046
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1047
                                _EncodeOpError(to_encode))
1048
          finalize = True
949 1049

  
950
      # Finalizing or last opcode?
951
      if finalize or opctx.index == (opcount - 1):
952
        # All opcodes have been run, finalize job
953
        job.end_timestamp = TimeStampNow()
1050
          # Consistency check
1051
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1052
                            errors.GetEncodedError(i.result)
1053
                            for i in job.ops[opctx.index:])
954 1054

  
955
      # Write to disk. If the job status is final, this is the final write
956
      # allowed. Once the file has been written, it can be archived anytime.
957
      queue.UpdateJobUnlocked(job)
1055
        elif op.status == constants.OP_STATUS_CANCELING:
1056
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1057
                                "Job canceled by request")
1058
          finalize = True
1059

  
1060
        elif op.status == constants.OP_STATUS_CANCELED:
1061
          finalize = True
1062

  
1063
        else:
1064
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1065

  
1066
        # Finalizing or last opcode?
1067
        if finalize or opctx.index == (opcount - 1):
1068
          # All opcodes have been run, finalize job
1069
          job.end_timestamp = TimeStampNow()
1070

  
1071
        # Write to disk. If the job status is final, this is the final write
1072
        # allowed. Once the file has been written, it can be archived anytime.
1073
        queue.UpdateJobUnlocked(job)
958 1074

  
959
      if finalize or opctx.index == (opcount - 1):
960
        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
961
        return True
1075
        if finalize or opctx.index == (opcount - 1):
1076
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1077
          return True
962 1078

  
963 1079
      return False
964 1080
    finally:
......
988 1104

  
989 1105
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
990 1106
      # Schedule again
991
      raise workerpool.DeferTask()
1107
      raise workerpool.DeferTask(priority=job.CalcPriority())
992 1108

  
993 1109

  
994 1110
class _JobQueueWorkerPool(workerpool.WorkerPool):

Also available in: Unified diff