Revision de9d02c7

b/lib/jqueue.py
910 910
    # Setup worker pool
911 911
    self._wpool = _JobQueueWorkerPool(self)
912 912
    try:
913
      # We need to lock here because WorkerPool.AddTask() may start a job while
914
      # we're still doing our work.
915
      self.acquire()
916
      try:
917
        logging.info("Inspecting job queue")
913
      self._InspectQueue()
914
    except:
915
      self._wpool.TerminateWorkers()
916
      raise
917

  
918
  @locking.ssynchronized(_LOCK)
919
  @_RequireOpenQueue
920
  def _InspectQueue(self):
921
    """Loads the whole job queue and resumes unfinished jobs.
922

  
923
    This function needs the lock here because WorkerPool.AddTask() may start a
924
    job while we're still doing our work.
925

  
926
    """
927
    logging.info("Inspecting job queue")
918 928

  
919
        all_job_ids = self._GetJobIDsUnlocked()
920
        jobs_count = len(all_job_ids)
929
    all_job_ids = self._GetJobIDsUnlocked()
930
    jobs_count = len(all_job_ids)
931
    lastinfo = time.time()
932
    for idx, job_id in enumerate(all_job_ids):
933
      # Give an update every 1000 jobs or 10 seconds
934
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
935
          idx == (jobs_count - 1)):
936
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
937
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
921 938
        lastinfo = time.time()
922
        for idx, job_id in enumerate(all_job_ids):
923
          # Give an update every 1000 jobs or 10 seconds
924
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
925
              idx == (jobs_count - 1)):
926
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
927
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
928
            lastinfo = time.time()
929

  
930
          job = self._LoadJobUnlocked(job_id)
931

  
932
          # a failure in loading the job can cause 'None' to be returned
933
          if job is None:
934
            continue
935 939

  
936
          status = job.CalcStatus()
940
      job = self._LoadJobUnlocked(job_id)
937 941

  
938
          if status in (constants.JOB_STATUS_QUEUED, ):
939
            self._wpool.AddTask((job, ))
942
      # a failure in loading the job can cause 'None' to be returned
943
      if job is None:
944
        continue
940 945

  
941
          elif status in (constants.JOB_STATUS_RUNNING,
942
                          constants.JOB_STATUS_WAITLOCK,
943
                          constants.JOB_STATUS_CANCELING):
944
            logging.warning("Unfinished job %s found: %s", job.id, job)
945
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
946
                                  "Unclean master daemon shutdown")
947
            self.UpdateJobUnlocked(job)
946
      status = job.CalcStatus()
948 947

  
949
        logging.info("Job queue inspection finished")
950
      finally:
951
        self.release()
952
    except:
953
      self._wpool.TerminateWorkers()
954
      raise
948
      if status in (constants.JOB_STATUS_QUEUED, ):
949
        self._wpool.AddTask((job, ))
950

  
951
      elif status in (constants.JOB_STATUS_RUNNING,
952
                      constants.JOB_STATUS_WAITLOCK,
953
                      constants.JOB_STATUS_CANCELING):
954
        logging.warning("Unfinished job %s found: %s", job.id, job)
955
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
956
                              "Unclean master daemon shutdown")
957
        self.UpdateJobUnlocked(job)
958

  
959
    logging.info("Job queue inspection finished")
955 960

  
956 961
  @locking.ssynchronized(_LOCK)
957 962
  @_RequireOpenQueue

Also available in: Unified diff