Revision 66bd7445 lib/jqueue.py

b/lib/jqueue.py
426 426
      op.result = result
427 427
      not_marked = False
428 428

  
429
  def Finalize(self):
430
    """Marks the job as finalized.
431

  
432
    """
433
    self.end_timestamp = TimeStampNow()
434

  
429 435
  def Cancel(self):
430 436
    """Marks job as canceled/-ing if possible.
431 437

  
......
439 445
    if status == constants.JOB_STATUS_QUEUED:
440 446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441 447
                             "Job canceled by request")
448
      self.Finalize()
442 449
      return (True, "Job %s canceled" % self.id)
443 450

  
444 451
    elif status == constants.JOB_STATUS_WAITLOCK:
......
866 873
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867 874
                             timeout_strategy_factory)
868 875

  
869
      if op.status == constants.OP_STATUS_CANCELED:
870
        # Cancelled jobs are handled by the caller
871
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872
                              for i in job.ops[idx:])
873

  
874
      elif op.status in constants.OPS_FINALIZED:
875
        # This is a job that was partially completed before master daemon
876
        # shutdown, so it can be expected that some opcodes are already
877
        # completed successfully (if any did error out, then the whole job
878
        # should have been aborted and not resubmitted for processing).
879
        logging.info("%s: opcode %s already processed, skipping",
880
                     opctx.log_prefix, opctx.summary)
881
        continue
876
      if op.status not in constants.OPS_FINALIZED:
877
        return opctx
882 878

  
883
      return opctx
879
      # This is a job that was partially completed before master daemon
880
      # shutdown, so it can be expected that some opcodes are already
881
      # completed successfully (if any did error out, then the whole job
882
      # should have been aborted and not resubmitted for processing).
883
      logging.info("%s: opcode %s already processed, skipping",
884
                   opctx.log_prefix, opctx.summary)
884 885

  
885 886
  @staticmethod
886 887
  def _MarkWaitlock(job, op):
......
977 978
    try:
978 979
      opcount = len(job.ops)
979 980

  
981
      # Don't do anything for finalized jobs
982
      if job.CalcStatus() in constants.JOBS_FINALIZED:
983
        return True
984

  
980 985
      # Is a previous opcode still pending?
981 986
      if job.cur_opctx:
982 987
        opctx = job.cur_opctx
......
990 995

  
991 996
      # Consistency check
992 997
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
993
                                     constants.OP_STATUS_CANCELING,
994
                                     constants.OP_STATUS_CANCELED)
998
                                     constants.OP_STATUS_CANCELING)
995 999
                        for i in job.ops[opctx.index + 1:])
996 1000

  
997 1001
      assert op.status in (constants.OP_STATUS_QUEUED,
998 1002
                           constants.OP_STATUS_WAITLOCK,
999
                           constants.OP_STATUS_CANCELING,
1000
                           constants.OP_STATUS_CANCELED)
1003
                           constants.OP_STATUS_CANCELING)
1001 1004

  
1002 1005
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1003 1006
              op.priority >= constants.OP_PRIO_HIGHEST)
1004 1007

  
1005
      if op.status not in (constants.OP_STATUS_CANCELING,
1006
                           constants.OP_STATUS_CANCELED):
1008
      if op.status != constants.OP_STATUS_CANCELING:
1007 1009
        assert op.status in (constants.OP_STATUS_QUEUED,
1008 1010
                             constants.OP_STATUS_WAITLOCK)
1009 1011

  
......
1088 1090
                                "Job canceled by request")
1089 1091
          finalize = True
1090 1092

  
1091
        elif op.status == constants.OP_STATUS_CANCELED:
1092
          finalize = True
1093

  
1094 1093
        else:
1095 1094
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1096 1095

  
1097
        # Finalizing or last opcode?
1098
        if finalize or opctx.index == (opcount - 1):
1096
        if opctx.index == (opcount - 1):
1097
          # Finalize on last opcode
1098
          finalize = True
1099

  
1100
        if finalize:
1099 1101
          # All opcodes have been run, finalize job
1100
          job.end_timestamp = TimeStampNow()
1102
          job.Finalize()
1101 1103

  
1102 1104
        # Write to disk. If the job status is final, this is the final write
1103 1105
        # allowed. Once the file has been written, it can be archived anytime.
1104 1106
        queue.UpdateJobUnlocked(job)
1105 1107

  
1106
        if finalize or opctx.index == (opcount - 1):
1108
        if finalize:
1107 1109
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1108 1110
          return True
1109 1111

  
......
1775 1777
    @param replicate: whether to replicate the change to remote nodes
1776 1778

  
1777 1779
    """
1780
    if __debug__:
1781
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1782
      assert (finalized ^ (job.end_timestamp is None))
1783

  
1778 1784
    filename = self._GetJobPath(job.id)
1779 1785
    data = serializer.DumpJson(job.Serialize(), indent=False)
1780 1786
    logging.debug("Writing job %s to %s", job.id, filename)
......
1832 1838
    (success, msg) = job.Cancel()
1833 1839

  
1834 1840
    if success:
1841
      # If the job was finalized (e.g. cancelled), this is the final write
1842
      # allowed. The job can be archived anytime.
1835 1843
      self.UpdateJobUnlocked(job)
1836 1844

  
1837 1845
    return (success, msg)

Also available in: Unified diff