Revision de9d02c7 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
910 | 910 |
# Setup worker pool |
911 | 911 |
self._wpool = _JobQueueWorkerPool(self) |
912 | 912 |
try: |
913 |
# We need to lock here because WorkerPool.AddTask() may start a job while |
|
914 |
# we're still doing our work. |
|
915 |
self.acquire() |
|
916 |
try: |
|
917 |
logging.info("Inspecting job queue") |
|
913 |
self._InspectQueue() |
|
914 |
except: |
|
915 |
self._wpool.TerminateWorkers() |
|
916 |
raise |
|
917 |
|
|
918 |
@locking.ssynchronized(_LOCK) |
|
919 |
@_RequireOpenQueue |
|
920 |
def _InspectQueue(self): |
|
921 |
"""Loads the whole job queue and resumes unfinished jobs. |
|
922 |
|
|
923 |
This function needs the lock here because WorkerPool.AddTask() may start a |
|
924 |
job while we're still doing our work. |
|
925 |
|
|
926 |
""" |
|
927 |
logging.info("Inspecting job queue") |
|
918 | 928 |
|
919 |
all_job_ids = self._GetJobIDsUnlocked() |
|
920 |
jobs_count = len(all_job_ids) |
|
929 |
all_job_ids = self._GetJobIDsUnlocked() |
|
930 |
jobs_count = len(all_job_ids) |
|
931 |
lastinfo = time.time() |
|
932 |
for idx, job_id in enumerate(all_job_ids): |
|
933 |
# Give an update every 1000 jobs or 10 seconds |
|
934 |
if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or |
|
935 |
idx == (jobs_count - 1)): |
|
936 |
logging.info("Job queue inspection: %d/%d (%0.1f %%)", |
|
937 |
idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) |
|
921 | 938 |
lastinfo = time.time() |
922 |
for idx, job_id in enumerate(all_job_ids): |
|
923 |
# Give an update every 1000 jobs or 10 seconds |
|
924 |
if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or |
|
925 |
idx == (jobs_count - 1)): |
|
926 |
logging.info("Job queue inspection: %d/%d (%0.1f %%)", |
|
927 |
idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) |
|
928 |
lastinfo = time.time() |
|
929 |
|
|
930 |
job = self._LoadJobUnlocked(job_id) |
|
931 |
|
|
932 |
# a failure in loading the job can cause 'None' to be returned |
|
933 |
if job is None: |
|
934 |
continue |
|
935 | 939 |
|
936 |
status = job.CalcStatus()
|
|
940 |
job = self._LoadJobUnlocked(job_id)
|
|
937 | 941 |
|
938 |
if status in (constants.JOB_STATUS_QUEUED, ): |
|
939 |
self._wpool.AddTask((job, )) |
|
942 |
# a failure in loading the job can cause 'None' to be returned |
|
943 |
if job is None: |
|
944 |
continue |
|
940 | 945 |
|
941 |
elif status in (constants.JOB_STATUS_RUNNING, |
|
942 |
constants.JOB_STATUS_WAITLOCK, |
|
943 |
constants.JOB_STATUS_CANCELING): |
|
944 |
logging.warning("Unfinished job %s found: %s", job.id, job) |
|
945 |
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, |
|
946 |
"Unclean master daemon shutdown") |
|
947 |
self.UpdateJobUnlocked(job) |
|
946 |
status = job.CalcStatus() |
|
948 | 947 |
|
949 |
logging.info("Job queue inspection finished") |
|
950 |
finally: |
|
951 |
self.release() |
|
952 |
except: |
|
953 |
self._wpool.TerminateWorkers() |
|
954 |
raise |
|
948 |
if status in (constants.JOB_STATUS_QUEUED, ): |
|
949 |
self._wpool.AddTask((job, )) |
|
950 |
|
|
951 |
elif status in (constants.JOB_STATUS_RUNNING, |
|
952 |
constants.JOB_STATUS_WAITLOCK, |
|
953 |
constants.JOB_STATUS_CANCELING): |
|
954 |
logging.warning("Unfinished job %s found: %s", job.id, job) |
|
955 |
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, |
|
956 |
"Unclean master daemon shutdown") |
|
957 |
self.UpdateJobUnlocked(job) |
|
958 |
|
|
959 |
logging.info("Job queue inspection finished") |
|
955 | 960 |
|
956 | 961 |
@locking.ssynchronized(_LOCK) |
957 | 962 |
@_RequireOpenQueue |
Also available in: Unified diff