Revision fcd70b89 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
1712 | 1712 |
self._wpool.TerminateWorkers() |
1713 | 1713 |
raise |
1714 | 1714 |
|
1715 |
def _PickupJobUnlocked(self, job_id): |
|
1716 |
"""Load a job from the job queue |
|
1717 |
|
|
1718 |
Pick up a job that already is in the job queue and start/resume it. |
|
1719 |
|
|
1720 |
""" |
|
1721 |
job = self._LoadJobUnlocked(job_id) |
|
1722 |
|
|
1723 |
if job is None: |
|
1724 |
logging.warning("Job %s could not be read", job_id) |
|
1725 |
return |
|
1726 |
|
|
1727 |
status = job.CalcStatus() |
|
1728 |
|
|
1729 |
if status == constants.JOB_STATUS_QUEUED: |
|
1730 |
self._EnqueueJobsUnlocked([job]) |
|
1731 |
logging.info("Restarting job %s", job.id) |
|
1732 |
|
|
1733 |
elif status in (constants.JOB_STATUS_RUNNING, |
|
1734 |
constants.JOB_STATUS_WAITING, |
|
1735 |
constants.JOB_STATUS_CANCELING): |
|
1736 |
logging.warning("Unfinished job %s found: %s", job.id, job) |
|
1737 |
|
|
1738 |
if status == constants.JOB_STATUS_WAITING: |
|
1739 |
job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) |
|
1740 |
self._EnqueueJobsUnlocked([job]) |
|
1741 |
logging.info("Restarting job %s", job.id) |
|
1742 |
else: |
|
1743 |
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, |
|
1744 |
"Unclean master daemon shutdown") |
|
1745 |
job.Finalize() |
|
1746 |
|
|
1747 |
self.UpdateJobUnlocked(job) |
|
1748 |
|
|
1749 |
@locking.ssynchronized(_LOCK) |
|
1750 |
def PickupJob(self, job_id): |
|
1751 |
self._PickupJobUnlocked(job_id) |
|
1752 |
|
|
1715 | 1753 |
@locking.ssynchronized(_LOCK) |
1716 | 1754 |
@_RequireOpenQueue |
1717 | 1755 |
def _InspectQueue(self): |
... | ... | |
1723 | 1761 |
""" |
1724 | 1762 |
logging.info("Inspecting job queue") |
1725 | 1763 |
|
1726 |
restartjobs = [] |
|
1727 |
|
|
1728 | 1764 |
all_job_ids = self._GetJobIDsUnlocked() |
1729 | 1765 |
jobs_count = len(all_job_ids) |
1730 | 1766 |
lastinfo = time.time() |
... | ... | |
1736 | 1772 |
idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) |
1737 | 1773 |
lastinfo = time.time() |
1738 | 1774 |
|
1739 |
job = self._LoadJobUnlocked(job_id) |
|
1740 |
|
|
1741 |
# a failure in loading the job can cause 'None' to be returned |
|
1742 |
if job is None: |
|
1743 |
continue |
|
1744 |
|
|
1745 |
status = job.CalcStatus() |
|
1746 |
|
|
1747 |
if status == constants.JOB_STATUS_QUEUED: |
|
1748 |
restartjobs.append(job) |
|
1749 |
|
|
1750 |
elif status in (constants.JOB_STATUS_RUNNING, |
|
1751 |
constants.JOB_STATUS_WAITING, |
|
1752 |
constants.JOB_STATUS_CANCELING): |
|
1753 |
logging.warning("Unfinished job %s found: %s", job.id, job) |
|
1754 |
|
|
1755 |
if status == constants.JOB_STATUS_WAITING: |
|
1756 |
# Restart job |
|
1757 |
job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) |
|
1758 |
restartjobs.append(job) |
|
1759 |
else: |
|
1760 |
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, |
|
1761 |
"Unclean master daemon shutdown") |
|
1762 |
job.Finalize() |
|
1763 |
|
|
1764 |
self.UpdateJobUnlocked(job) |
|
1765 |
|
|
1766 |
if restartjobs: |
|
1767 |
logging.info("Restarting %s jobs", len(restartjobs)) |
|
1768 |
self._EnqueueJobsUnlocked(restartjobs) |
|
1775 |
self._PickupJobUnlocked(job_id) |
|
1769 | 1776 |
|
1770 | 1777 |
logging.info("Job queue inspection finished") |
1771 | 1778 |
|
Also available in: Unified diff