Revision 66bd7445

b/lib/jqueue.py
426 426
      op.result = result
427 427
      not_marked = False
428 428

  
429
  def Finalize(self):
430
    """Marks the job as finalized.
431

  
432
    """
433
    self.end_timestamp = TimeStampNow()
434

  
429 435
  def Cancel(self):
430 436
    """Marks job as canceled/-ing if possible.
431 437

  
......
439 445
    if status == constants.JOB_STATUS_QUEUED:
440 446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441 447
                             "Job canceled by request")
448
      self.Finalize()
442 449
      return (True, "Job %s canceled" % self.id)
443 450

  
444 451
    elif status == constants.JOB_STATUS_WAITLOCK:
......
866 873
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867 874
                             timeout_strategy_factory)
868 875

  
869
      if op.status == constants.OP_STATUS_CANCELED:
870
        # Cancelled jobs are handled by the caller
871
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872
                              for i in job.ops[idx:])
873

  
874
      elif op.status in constants.OPS_FINALIZED:
875
        # This is a job that was partially completed before master daemon
876
        # shutdown, so it can be expected that some opcodes are already
877
        # completed successfully (if any did error out, then the whole job
878
        # should have been aborted and not resubmitted for processing).
879
        logging.info("%s: opcode %s already processed, skipping",
880
                     opctx.log_prefix, opctx.summary)
881
        continue
876
      if op.status not in constants.OPS_FINALIZED:
877
        return opctx
882 878

  
883
      return opctx
879
      # This is a job that was partially completed before master daemon
880
      # shutdown, so it can be expected that some opcodes are already
881
      # completed successfully (if any did error out, then the whole job
882
      # should have been aborted and not resubmitted for processing).
883
      logging.info("%s: opcode %s already processed, skipping",
884
                   opctx.log_prefix, opctx.summary)
884 885

  
885 886
  @staticmethod
886 887
  def _MarkWaitlock(job, op):
......
977 978
    try:
978 979
      opcount = len(job.ops)
979 980

  
981
      # Don't do anything for finalized jobs
982
      if job.CalcStatus() in constants.JOBS_FINALIZED:
983
        return True
984

  
980 985
      # Is a previous opcode still pending?
981 986
      if job.cur_opctx:
982 987
        opctx = job.cur_opctx
......
990 995

  
991 996
      # Consistency check
992 997
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
993
                                     constants.OP_STATUS_CANCELING,
994
                                     constants.OP_STATUS_CANCELED)
998
                                     constants.OP_STATUS_CANCELING)
995 999
                        for i in job.ops[opctx.index + 1:])
996 1000

  
997 1001
      assert op.status in (constants.OP_STATUS_QUEUED,
998 1002
                           constants.OP_STATUS_WAITLOCK,
999
                           constants.OP_STATUS_CANCELING,
1000
                           constants.OP_STATUS_CANCELED)
1003
                           constants.OP_STATUS_CANCELING)
1001 1004

  
1002 1005
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1003 1006
              op.priority >= constants.OP_PRIO_HIGHEST)
1004 1007

  
1005
      if op.status not in (constants.OP_STATUS_CANCELING,
1006
                           constants.OP_STATUS_CANCELED):
1008
      if op.status != constants.OP_STATUS_CANCELING:
1007 1009
        assert op.status in (constants.OP_STATUS_QUEUED,
1008 1010
                             constants.OP_STATUS_WAITLOCK)
1009 1011

  
......
1088 1090
                                "Job canceled by request")
1089 1091
          finalize = True
1090 1092

  
1091
        elif op.status == constants.OP_STATUS_CANCELED:
1092
          finalize = True
1093

  
1094 1093
        else:
1095 1094
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1096 1095

  
1097
        # Finalizing or last opcode?
1098
        if finalize or opctx.index == (opcount - 1):
1096
        if opctx.index == (opcount - 1):
1097
          # Finalize on last opcode
1098
          finalize = True
1099

  
1100
        if finalize:
1099 1101
          # All opcodes have been run, finalize job
1100
          job.end_timestamp = TimeStampNow()
1102
          job.Finalize()
1101 1103

  
1102 1104
        # Write to disk. If the job status is final, this is the final write
1103 1105
        # allowed. Once the file has been written, it can be archived anytime.
1104 1106
        queue.UpdateJobUnlocked(job)
1105 1107

  
1106
        if finalize or opctx.index == (opcount - 1):
1108
        if finalize:
1107 1109
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1108 1110
          return True
1109 1111

  
......
1775 1777
    @param replicate: whether to replicate the change to remote nodes
1776 1778

  
1777 1779
    """
1780
    if __debug__:
1781
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1782
      assert (finalized ^ (job.end_timestamp is None))
1783

  
1778 1784
    filename = self._GetJobPath(job.id)
1779 1785
    data = serializer.DumpJson(job.Serialize(), indent=False)
1780 1786
    logging.debug("Writing job %s to %s", job.id, filename)
......
1832 1838
    (success, msg) = job.Cancel()
1833 1839

  
1834 1840
    if success:
1841
      # If the job was finalized (e.g. cancelled), this is the final write
1842
      # allowed. The job can be archived anytime.
1835 1843
      self.UpdateJobUnlocked(job)
1836 1844

  
1837 1845
    return (success, msg)
b/test/ganeti.jqueue_unittest.py
565 565

  
566 566
      self._GenericCheckJob(job)
567 567

  
568
      # Finished jobs can't be processed any further
569
      self.assertRaises(errors.ProgrammerError,
570
                        jqueue._JobProcessor(queue, opexec, job))
568
      # Calling the processor on a finished job should be a no-op
569
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
570
      self.assertRaises(IndexError, queue.GetNextUpdate)
571 571

  
572 572
  def testOpcodeError(self):
573 573
    queue = _FakeQueueForProc()
......
643 643

  
644 644
      self._GenericCheckJob(job)
645 645

  
646
      # Finished jobs can't be processed any further
647
      self.assertRaises(errors.ProgrammerError,
648
                        jqueue._JobProcessor(queue, opexec, job))
646
      # Calling the processor on a finished job should be a no-op
647
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
648
      self.assertRaises(IndexError, queue.GetNextUpdate)
649 649

  
650 650
  def testCancelWhileInQueue(self):
651 651
    queue = _FakeQueueForProc()
......
665 665

  
666 666
    self.assertRaises(IndexError, queue.GetNextUpdate)
667 667

  
668
    self.assertFalse(job.start_timestamp)
669
    self.assertTrue(job.end_timestamp)
668 670
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
669 671
                            for op in job.ops))
670 672

  
673
    # Serialize to check for differences
674
    before_proc = job.Serialize()
675

  
676
    # Simulate processor called in workerpool
671 677
    opexec = _FakeExecOpCodeForProc(queue, None, None)
672 678
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
673 679

  
......
675 681
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
676 682
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
677 683
    self.assertFalse(job.start_timestamp)
678
    self.assert_(job.end_timestamp)
684
    self.assertTrue(job.end_timestamp)
679 685
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
680 686
                                for op in job.ops))
681 687
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
682 688
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
683 689
                      ["Job canceled by request" for _ in job.ops]])
684 690

  
691
    # Must not have changed or written
692
    self.assertEqual(before_proc, job.Serialize())
693
    self.assertRaises(IndexError, queue.GetNextUpdate)
694

  
685 695
  def testCancelWhileWaitlockInQueue(self):
686 696
    queue = _FakeQueueForProc()
687 697

  
......
903 913

  
904 914
    for remaining in reversed(range(len(job.ops) - successcount)):
905 915
      result = jqueue._JobProcessor(queue, opexec, job)()
916
      self.assertEqual(queue.GetNextUpdate(), (job, True))
917
      self.assertEqual(queue.GetNextUpdate(), (job, True))
918
      self.assertEqual(queue.GetNextUpdate(), (job, True))
919
      self.assertRaises(IndexError, queue.GetNextUpdate)
906 920

  
907 921
      if remaining == 0:
908 922
        # Last opcode
......
913 927

  
914 928
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
915 929

  
930
    self.assertRaises(IndexError, queue.GetNextUpdate)
916 931
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
917 932
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
918 933
    self.assertEqual(job.GetInfo(["opresult"]),
......
924 939

  
925 940
    self._GenericCheckJob(job)
926 941

  
927
    # Finished jobs can't be processed any further
928
    self.assertRaises(errors.ProgrammerError,
929
                      jqueue._JobProcessor(queue, opexec, job))
942
    # Calling the processor on a finished job should be a no-op
943
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
944
    self.assertRaises(IndexError, queue.GetNextUpdate)
930 945

  
931 946
    # ... also after being restored
932 947
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
933
    self.assertRaises(errors.ProgrammerError,
934
                      jqueue._JobProcessor(queue, opexec, job2))
948
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
949
    self.assertRaises(IndexError, queue.GetNextUpdate)
935 950

  
936 951
  def testProcessorOnRunningJob(self):
937 952
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
......
1293 1308
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1294 1309
                            for op in job.ops))
1295 1310

  
1296
    # Finished jobs can't be processed any further
1297
    self.assertRaises(errors.ProgrammerError,
1298
                      jqueue._JobProcessor(self.queue, opexec, job))
1311
    # Calling the processor on a finished job should be a no-op
1312
    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1313
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1299 1314

  
1300 1315

  
1301 1316
if __name__ == "__main__":

Also available in: Unified diff