Revision 39348887 lib/jqueue.py

b/lib/jqueue.py
1707 1707

  
1708 1708
    # Setup worker pool
1709 1709
    self._wpool = _JobQueueWorkerPool(self)
1710
    try:
1711
      self._InspectQueue()
1712
    except:
1713
      self._wpool.TerminateWorkers()
1714
      raise
1715 1710

  
1716 1711
  def _PickupJobUnlocked(self, job_id):
1717 1712
    """Load a job from the job queue
......
1751 1746
  def PickupJob(self, job_id):
1752 1747
    self._PickupJobUnlocked(job_id)
1753 1748

  
1754
  @locking.ssynchronized(_LOCK)
1755
  @_RequireOpenQueue
1756
  def _InspectQueue(self):
1757
    """Loads the whole job queue and resumes unfinished jobs.
1758

  
1759
    This function needs the lock here because WorkerPool.AddTask() may start a
1760
    job while we're still doing our work.
1761

  
1762
    """
1763
    logging.info("Inspecting job queue")
1764

  
1765
    all_job_ids = self._GetJobIDsUnlocked()
1766
    jobs_count = len(all_job_ids)
1767
    lastinfo = time.time()
1768
    for idx, job_id in enumerate(all_job_ids):
1769
      # Give an update every 1000 jobs or 10 seconds
1770
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1771
          idx == (jobs_count - 1)):
1772
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1773
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1774
        lastinfo = time.time()
1775

  
1776
      self._PickupJobUnlocked(job_id)
1777

  
1778
    logging.info("Job queue inspection finished")
1779

  
1780 1749
  def _GetRpc(self, address_list):
1781 1750
    """Gets RPC runner with context.
1782 1751

  

Also available in: Unified diff