Revision be760ba8 lib/jqueue.py

b/lib/jqueue.py
176 176

  
177 177
  """
178 178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial",
179
  __slots__ = ["queue", "id", "ops", "log_serial", "current_op",
180 180
               "received_timestamp", "start_timestamp", "end_timestamp",
181 181
               "__weakref__"]
182 182

  
......
203 203
    self.start_timestamp = None
204 204
    self.end_timestamp = None
205 205

  
206
    self.current_op = None
207

  
206 208
  def __repr__(self):
207 209
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
208 210
              "id=%s" % self.id,
......
237 239
        obj.log_serial = max(obj.log_serial, log_entry[0])
238 240
      obj.ops.append(op)
239 241

  
242
    obj.current_op = None
243

  
240 244
    return obj
241 245

  
242 246
  def Serialize(self):
......
734 738
  return errors.EncodeException(to_encode)
735 739

  
736 740

  
741
class _JobProcessor(object):
742
  def __init__(self, queue, opexec_fn, job):
743
    """Initializes this class.
744

  
745
    """
746
    self.queue = queue
747
    self.opexec_fn = opexec_fn
748
    self.job = job
749

  
750
  @staticmethod
751
  def _FindNextOpcode(job):
752
    """Locates the next opcode to run.
753

  
754
    @type job: L{_QueuedJob}
755
    @param job: Job object
756

  
757
    """
758
    # Create some sort of a cache to speed up locating next opcode for future
759
    # lookups
760
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
761
    # pending and one for processed ops.
762
    if job.current_op is None:
763
      job.current_op = enumerate(job.ops)
764

  
765
    # Find next opcode to run
766
    while True:
767
      try:
768
        (idx, op) = job.current_op.next()
769
      except StopIteration:
770
        raise errors.ProgrammerError("Called for a finished job")
771

  
772
      if op.status == constants.OP_STATUS_RUNNING:
773
        # Found an opcode already marked as running
774
        raise errors.ProgrammerError("Called for job marked as running")
775

  
776
      log_prefix = "Op %s/%s" % (idx + 1, len(job.ops))
777
      summary = op.input.Summary()
778

  
779
      if op.status == constants.OP_STATUS_CANCELED:
780
        # Cancelled jobs are handled by the caller
781
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
782
                              for i in job.ops[idx:])
783

  
784
      elif op.status in constants.OPS_FINALIZED:
785
        # This is a job that was partially completed before master daemon
786
        # shutdown, so it can be expected that some opcodes are already
787
        # completed successfully (if any did error out, then the whole job
788
        # should have been aborted and not resubmitted for processing).
789
        logging.info("%s: opcode %s already processed, skipping",
790
                     log_prefix, summary)
791
        continue
792

  
793
      return (idx, op, log_prefix, summary)
794

  
795
  @staticmethod
796
  def _MarkWaitlock(job, op):
797
    """Marks an opcode as waiting for locks.
798

  
799
    The job's start timestamp is also set if necessary.
800

  
801
    @type job: L{_QueuedJob}
802
    @param job: Job object
803
    @type job: L{_QueuedOpCode}
804
    @param job: Opcode object
805

  
806
    """
807
    assert op in job.ops
808

  
809
    op.status = constants.OP_STATUS_WAITLOCK
810
    op.result = None
811
    op.start_timestamp = TimeStampNow()
812

  
813
    if job.start_timestamp is None:
814
      job.start_timestamp = op.start_timestamp
815

  
816
  def _ExecOpCodeUnlocked(self, log_prefix, op, summary):
817
    """Processes one opcode and returns the result.
818

  
819
    """
820
    assert op.status == constants.OP_STATUS_WAITLOCK
821

  
822
    try:
823
      # Make sure not to hold queue lock while calling ExecOpCode
824
      result = self.opexec_fn(op.input,
825
                              _OpExecCallbacks(self.queue, self.job, op))
826
    except CancelJob:
827
      logging.exception("%s: Canceling job", log_prefix)
828
      assert op.status == constants.OP_STATUS_CANCELING
829
      return (constants.OP_STATUS_CANCELING, None)
830
    except Exception, err: # pylint: disable-msg=W0703
831
      logging.exception("%s: Caught exception in %s", log_prefix, summary)
832
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
833
    else:
834
      logging.debug("%s: %s successful", log_prefix, summary)
835
      return (constants.OP_STATUS_SUCCESS, result)
836

  
837
  def __call__(self):
838
    """Continues execution of a job.
839

  
840
    @rtype: bool
841
    @return: True if job is finished, False if processor needs to be called
842
             again
843

  
844
    """
845
    queue = self.queue
846
    job = self.job
847

  
848
    logging.debug("Processing job %s", job.id)
849

  
850
    queue.acquire(shared=1)
851
    try:
852
      opcount = len(job.ops)
853

  
854
      (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job)
855

  
856
      # Consistency check
857
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
858
                                     constants.OP_STATUS_CANCELED)
859
                        for i in job.ops[opidx:])
860

  
861
      assert op.status in (constants.OP_STATUS_QUEUED,
862
                           constants.OP_STATUS_WAITLOCK,
863
                           constants.OP_STATUS_CANCELED)
864

  
865
      if op.status != constants.OP_STATUS_CANCELED:
866
        # Prepare to start opcode
867
        self._MarkWaitlock(job, op)
868

  
869
        assert op.status == constants.OP_STATUS_WAITLOCK
870
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
871

  
872
        # Write to disk
873
        queue.UpdateJobUnlocked(job)
874

  
875
        logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary)
876

  
877
        queue.release()
878
        try:
879
          (op_status, op_result) = \
880
            self._ExecOpCodeUnlocked(log_prefix, op, op_summary)
881
        finally:
882
          queue.acquire(shared=1)
883

  
884
        # Finalize opcode
885
        op.end_timestamp = TimeStampNow()
886
        op.status = op_status
887
        op.result = op_result
888

  
889
        if op.status == constants.OP_STATUS_CANCELING:
890
          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
891
                                for i in job.ops[opidx:])
892
        else:
893
          assert op.status in constants.OPS_FINALIZED
894

  
895
      # Ensure all opcodes so far have been successful
896
      assert (opidx == 0 or
897
              compat.all(i.status == constants.OP_STATUS_SUCCESS
898
                         for i in job.ops[:opidx]))
899

  
900
      if op.status == constants.OP_STATUS_SUCCESS:
901
        finalize = False
902

  
903
      elif op.status == constants.OP_STATUS_ERROR:
904
        # Ensure failed opcode has an exception as its result
905
        assert errors.GetEncodedError(job.ops[opidx].result)
906

  
907
        to_encode = errors.OpExecError("Preceding opcode failed")
908
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
909
                              _EncodeOpError(to_encode))
910
        finalize = True
911

  
912
        # Consistency check
913
        assert compat.all(i.status == constants.OP_STATUS_ERROR and
914
                          errors.GetEncodedError(i.result)
915
                          for i in job.ops[opidx:])
916

  
917
      elif op.status == constants.OP_STATUS_CANCELING:
918
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
919
                              "Job canceled by request")
920
        finalize = True
921

  
922
      elif op.status == constants.OP_STATUS_CANCELED:
923
        finalize = True
924

  
925
      else:
926
        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
927

  
928
      # Finalizing or last opcode?
929
      if finalize or opidx == (opcount - 1):
930
        # All opcodes have been run, finalize job
931
        job.end_timestamp = TimeStampNow()
932

  
933
      # Write to disk. If the job status is final, this is the final write
934
      # allowed. Once the file has been written, it can be archived anytime.
935
      queue.UpdateJobUnlocked(job)
936

  
937
      if finalize or opidx == (opcount - 1):
938
        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
939
        return True
940

  
941
      return False
942
    finally:
943
      queue.release()
944

  
945

  
737 946
class _JobQueueWorker(workerpool.BaseWorker):
738 947
  """The actual job workers.
739 948

  
......
741 950
  def RunTask(self, job): # pylint: disable-msg=W0221
742 951
    """Job executor.
743 952

  
744
    This functions processes a job. It is closely tied to the _QueuedJob and
745
    _QueuedOpCode classes.
953
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
954
    L{_QueuedOpCode} classes.
746 955

  
747 956
    @type job: L{_QueuedJob}
748 957
    @param job: the job to be processed
749 958

  
750 959
    """
960
    queue = job.queue
961
    assert queue == self.pool.queue
962

  
751 963
    self.SetTaskName("Job%s" % job.id)
752 964

  
753
    logging.info("Processing job %s", job.id)
754
    proc = mcpu.Processor(self.pool.queue.context, job.id)
755
    queue = job.queue
756
    try:
757
      try:
758
        count = len(job.ops)
759
        for idx, op in enumerate(job.ops):
760
          op_summary = op.input.Summary()
761
          if op.status == constants.OP_STATUS_SUCCESS:
762
            # this is a job that was partially completed before master
763
            # daemon shutdown, so it can be expected that some opcodes
764
            # are already completed successfully (if any did error
765
            # out, then the whole job should have been aborted and not
766
            # resubmitted for processing)
767
            logging.info("Op %s/%s: opcode %s already processed, skipping",
768
                         idx + 1, count, op_summary)
769
            continue
770
          try:
771
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
772
                         op_summary)
773

  
774
            queue.acquire(shared=1)
775
            try:
776
              if op.status == constants.OP_STATUS_CANCELED:
777
                logging.debug("Canceling opcode")
778
                raise CancelJob()
779
              assert op.status == constants.OP_STATUS_QUEUED
780
              logging.debug("Opcode %s/%s waiting for locks",
781
                            idx + 1, count)
782
              op.status = constants.OP_STATUS_WAITLOCK
783
              op.result = None
784
              op.start_timestamp = TimeStampNow()
785
              if idx == 0: # first opcode
786
                job.start_timestamp = op.start_timestamp
787
              queue.UpdateJobUnlocked(job)
788

  
789
              input_opcode = op.input
790
            finally:
791
              queue.release()
792

  
793
            # Make sure not to hold queue lock while calling ExecOpCode
794
            result = proc.ExecOpCode(input_opcode,
795
                                     _OpExecCallbacks(queue, job, op))
796

  
797
            queue.acquire(shared=1)
798
            try:
799
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
800
              op.status = constants.OP_STATUS_SUCCESS
801
              op.result = result
802
              op.end_timestamp = TimeStampNow()
803
              if idx == count - 1:
804
                job.end_timestamp = TimeStampNow()
805

  
806
                # Consistency check
807
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
808
                                  for i in job.ops)
809

  
810
              queue.UpdateJobUnlocked(job)
811
            finally:
812
              queue.release()
813

  
814
            logging.info("Op %s/%s: Successfully finished opcode %s",
815
                         idx + 1, count, op_summary)
816
          except CancelJob:
817
            # Will be handled further up
818
            raise
819
          except Exception, err:
820
            queue.acquire(shared=1)
821
            try:
822
              try:
823
                logging.debug("Opcode %s/%s failed", idx + 1, count)
824
                op.status = constants.OP_STATUS_ERROR
825
                op.result = _EncodeOpError(err)
826
                op.end_timestamp = TimeStampNow()
827
                logging.info("Op %s/%s: Error in opcode %s: %s",
828
                             idx + 1, count, op_summary, err)
829

  
830
                to_encode = errors.OpExecError("Preceding opcode failed")
831
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
832
                                      _EncodeOpError(to_encode))
833

  
834
                # Consistency check
835
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
836
                                  for i in job.ops[:idx])
837
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
838
                                  errors.GetEncodedError(i.result)
839
                                  for i in job.ops[idx:])
840
              finally:
841
                job.end_timestamp = TimeStampNow()
842
                queue.UpdateJobUnlocked(job)
843
            finally:
844
              queue.release()
845
            raise
846

  
847
      except CancelJob:
848
        queue.acquire(shared=1)
849
        try:
850
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
851
                                "Job canceled by request")
852
          job.end_timestamp = TimeStampNow()
853
          queue.UpdateJobUnlocked(job)
854
        finally:
855
          queue.release()
856
      except errors.GenericError, err:
857
        logging.exception("Ganeti exception")
858
      except:
859
        logging.exception("Unhandled exception")
860
    finally:
861
      status = job.CalcStatus()
862
      logging.info("Finished job %s, status = %s", job.id, status)
965
    proc = mcpu.Processor(queue.context, job.id)
966

  
967
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
968
      # Schedule again
969
      raise workerpool.DeferTask()
863 970

  
864 971

  
865 972
class _JobQueueWorkerPool(workerpool.WorkerPool):

Also available in: Unified diff