Revision fcd70b89 lib/jqueue.py

b/lib/jqueue.py
1712 1712
      self._wpool.TerminateWorkers()
1713 1713
      raise
1714 1714

  
1715
  def _PickupJobUnlocked(self, job_id):
1716
    """Load a job from the job queue
1717

  
1718
    Pick up a job that already is in the job queue and start/resume it.
1719

  
1720
    """
1721
    job = self._LoadJobUnlocked(job_id)
1722

  
1723
    if job is None:
1724
      logging.warning("Job %s could not be read", job_id)
1725
      return
1726

  
1727
    status = job.CalcStatus()
1728

  
1729
    if status == constants.JOB_STATUS_QUEUED:
1730
      self._EnqueueJobsUnlocked([job])
1731
      logging.info("Restarting job %s", job.id)
1732

  
1733
    elif status in (constants.JOB_STATUS_RUNNING,
1734
                    constants.JOB_STATUS_WAITING,
1735
                    constants.JOB_STATUS_CANCELING):
1736
      logging.warning("Unfinished job %s found: %s", job.id, job)
1737

  
1738
      if status == constants.JOB_STATUS_WAITING:
1739
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740
        self._EnqueueJobsUnlocked([job])
1741
        logging.info("Restarting job %s", job.id)
1742
      else:
1743
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1744
                              "Unclean master daemon shutdown")
1745
        job.Finalize()
1746

  
1747
    self.UpdateJobUnlocked(job)
1748

  
1749
  @locking.ssynchronized(_LOCK)
1750
  def PickupJob(self, job_id):
1751
    self._PickupJobUnlocked(job_id)
1752

  
1715 1753
  @locking.ssynchronized(_LOCK)
1716 1754
  @_RequireOpenQueue
1717 1755
  def _InspectQueue(self):
......
1723 1761
    """
1724 1762
    logging.info("Inspecting job queue")
1725 1763

  
1726
    restartjobs = []
1727

  
1728 1764
    all_job_ids = self._GetJobIDsUnlocked()
1729 1765
    jobs_count = len(all_job_ids)
1730 1766
    lastinfo = time.time()
......
1736 1772
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1737 1773
        lastinfo = time.time()
1738 1774

  
1739
      job = self._LoadJobUnlocked(job_id)
1740

  
1741
      # a failure in loading the job can cause 'None' to be returned
1742
      if job is None:
1743
        continue
1744

  
1745
      status = job.CalcStatus()
1746

  
1747
      if status == constants.JOB_STATUS_QUEUED:
1748
        restartjobs.append(job)
1749

  
1750
      elif status in (constants.JOB_STATUS_RUNNING,
1751
                      constants.JOB_STATUS_WAITING,
1752
                      constants.JOB_STATUS_CANCELING):
1753
        logging.warning("Unfinished job %s found: %s", job.id, job)
1754

  
1755
        if status == constants.JOB_STATUS_WAITING:
1756
          # Restart job
1757
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1758
          restartjobs.append(job)
1759
        else:
1760
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1761
                                "Unclean master daemon shutdown")
1762
          job.Finalize()
1763

  
1764
        self.UpdateJobUnlocked(job)
1765

  
1766
    if restartjobs:
1767
      logging.info("Restarting %s jobs", len(restartjobs))
1768
      self._EnqueueJobsUnlocked(restartjobs)
1775
      self._PickupJobUnlocked(job_id)
1769 1776

  
1770 1777
    logging.info("Job queue inspection finished")
1771 1778

  

Also available in: Unified diff