Revision 39348887 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
1707 | 1707 |
|
1708 | 1708 |
# Setup worker pool |
1709 | 1709 |
self._wpool = _JobQueueWorkerPool(self) |
1710 |
try: |
|
1711 |
self._InspectQueue() |
|
1712 |
except: |
|
1713 |
self._wpool.TerminateWorkers() |
|
1714 |
raise |
|
1715 | 1710 |
|
1716 | 1711 |
def _PickupJobUnlocked(self, job_id): |
1717 | 1712 |
"""Load a job from the job queue |
... | ... | |
1751 | 1746 |
def PickupJob(self, job_id): |
1752 | 1747 |
self._PickupJobUnlocked(job_id) |
1753 | 1748 |
|
1754 |
@locking.ssynchronized(_LOCK) |
|
1755 |
@_RequireOpenQueue |
|
1756 |
def _InspectQueue(self): |
|
1757 |
"""Loads the whole job queue and resumes unfinished jobs. |
|
1758 |
|
|
1759 |
This function needs the lock here because WorkerPool.AddTask() may start a |
|
1760 |
job while we're still doing our work. |
|
1761 |
|
|
1762 |
""" |
|
1763 |
logging.info("Inspecting job queue") |
|
1764 |
|
|
1765 |
all_job_ids = self._GetJobIDsUnlocked() |
|
1766 |
jobs_count = len(all_job_ids) |
|
1767 |
lastinfo = time.time() |
|
1768 |
for idx, job_id in enumerate(all_job_ids): |
|
1769 |
# Give an update every 1000 jobs or 10 seconds |
|
1770 |
if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or |
|
1771 |
idx == (jobs_count - 1)): |
|
1772 |
logging.info("Job queue inspection: %d/%d (%0.1f %%)", |
|
1773 |
idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) |
|
1774 |
lastinfo = time.time() |
|
1775 |
|
|
1776 |
self._PickupJobUnlocked(job_id) |
|
1777 |
|
|
1778 |
logging.info("Job queue inspection finished") |
|
1779 |
|
|
1780 | 1749 |
def _GetRpc(self, address_list): |
1781 | 1750 |
"""Gets RPC runner with context. |
1782 | 1751 |
|
Also available in: Unified diff