Revision 5fd6b694 lib/jqueue.py

b/lib/jqueue.py
895 895

  
896 896
    """
897 897
    assert op in job.ops
898
    assert op.status in (constants.OP_STATUS_QUEUED,
899
                         constants.OP_STATUS_WAITLOCK)
900

  
901
    update = False
898 902

  
899
    op.status = constants.OP_STATUS_WAITLOCK
900 903
    op.result = None
901
    op.start_timestamp = TimeStampNow()
904

  
905
    if op.status == constants.OP_STATUS_QUEUED:
906
      op.status = constants.OP_STATUS_WAITLOCK
907
      update = True
908

  
909
    if op.start_timestamp is None:
910
      op.start_timestamp = TimeStampNow()
911
      update = True
902 912

  
903 913
    if job.start_timestamp is None:
904 914
      job.start_timestamp = op.start_timestamp
915
      update = True
916

  
917
    assert op.status == constants.OP_STATUS_WAITLOCK
918

  
919
    return update
905 920

  
906 921
  def _ExecOpCodeUnlocked(self, opctx):
907 922
    """Processes one opcode and returns the result.
......
929 944
      if op.status == constants.OP_STATUS_CANCELING:
930 945
        return (constants.OP_STATUS_CANCELING, None)
931 946

  
932
      return (constants.OP_STATUS_QUEUED, None)
947
      # Stay in waitlock while trying to re-acquire lock
948
      return (constants.OP_STATUS_WAITLOCK, None)
933 949
    except CancelJob:
934 950
      logging.exception("%s: Canceling job", opctx.log_prefix)
935 951
      assert op.status == constants.OP_STATUS_CANCELING
......
964 980
      # Is a previous opcode still pending?
965 981
      if job.cur_opctx:
966 982
        opctx = job.cur_opctx
983
        job.cur_opctx = None
967 984
      else:
968 985
        if __debug__ and _nextop_fn:
969 986
          _nextop_fn()
......
974 991
      # Consistency check
975 992
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
976 993
                                     constants.OP_STATUS_CANCELED)
977
                        for i in job.ops[opctx.index:])
994
                        for i in job.ops[opctx.index + 1:])
978 995

  
979 996
      assert op.status in (constants.OP_STATUS_QUEUED,
980 997
                           constants.OP_STATUS_WAITLOCK,
......
985 1002

  
986 1003
      if op.status != constants.OP_STATUS_CANCELED:
987 1004
        # Prepare to start opcode
988
        self._MarkWaitlock(job, op)
1005
        if self._MarkWaitlock(job, op):
1006
          # Write to disk
1007
          queue.UpdateJobUnlocked(job)
989 1008

  
990 1009
        assert op.status == constants.OP_STATUS_WAITLOCK
991 1010
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
992

  
993
        # Write to disk
994
        queue.UpdateJobUnlocked(job)
1011
        assert job.start_timestamp and op.start_timestamp
995 1012

  
996 1013
        logging.info("%s: opcode %s waiting for locks",
997 1014
                     opctx.log_prefix, opctx.summary)
......
1005 1022
        op.status = op_status
1006 1023
        op.result = op_result
1007 1024

  
1008
        if op.status == constants.OP_STATUS_QUEUED:
1025
        if op.status == constants.OP_STATUS_WAITLOCK:
1009 1026
          # Couldn't get locks in time
1010 1027
          assert not op.end_timestamp
1011 1028
        else:
......
1018 1035
          else:
1019 1036
            assert op.status in constants.OPS_FINALIZED
1020 1037

  
1021
      if op.status == constants.OP_STATUS_QUEUED:
1038
      if op.status == constants.OP_STATUS_WAITLOCK:
1022 1039
        finalize = False
1023 1040

  
1024
        opctx.CheckPriorityIncrease()
1041
        if opctx.CheckPriorityIncrease():
1042
          # Priority was changed, need to update on-disk file
1043
          queue.UpdateJobUnlocked(job)
1025 1044

  
1026 1045
        # Keep around for another round
1027 1046
        job.cur_opctx = opctx
......
1030 1049
                op.priority >= constants.OP_PRIO_HIGHEST)
1031 1050

  
1032 1051
        # In no case must the status be finalized here
1033
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1034

  
1035
        queue.UpdateJobUnlocked(job)
1052
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1036 1053

  
1037 1054
      else:
1038 1055
        # Ensure all opcodes so far have been successful

Also available in: Unified diff