From de9d02c7b032f53633ed8d8933bf38af4274742b Mon Sep 17 00:00:00 2001 From: Michael Hanselmann Date: Wed, 8 Sep 2010 19:25:07 +0200 Subject: [PATCH] jqueue: Move queue inspection into separate function MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This makes the __init__ function a lot smaller while not changing functionality. Signed-off-by: Michael Hanselmann Reviewed-by: René Nussbaumer --- lib/jqueue.py | 77 ++++++++++++++++++++++++++++++--------------------------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index fb269f5..bd4d986 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -910,48 +910,53 @@ class JobQueue(object): # Setup worker pool self._wpool = _JobQueueWorkerPool(self) try: - # We need to lock here because WorkerPool.AddTask() may start a job while - # we're still doing our work. - self.acquire() - try: - logging.info("Inspecting job queue") + self._InspectQueue() + except: + self._wpool.TerminateWorkers() + raise + + @locking.ssynchronized(_LOCK) + @_RequireOpenQueue + def _InspectQueue(self): + """Loads the whole job queue and resumes unfinished jobs. + + This function needs the lock here because WorkerPool.AddTask() may start a + job while we're still doing our work. + + """ + logging.info("Inspecting job queue") - all_job_ids = self._GetJobIDsUnlocked() - jobs_count = len(all_job_ids) + all_job_ids = self._GetJobIDsUnlocked() + jobs_count = len(all_job_ids) + lastinfo = time.time() + for idx, job_id in enumerate(all_job_ids): + # Give an update every 1000 jobs or 10 seconds + if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or + idx == (jobs_count - 1)): + logging.info("Job queue inspection: %d/%d (%0.1f %%)", + idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) lastinfo = time.time() - for idx, job_id in enumerate(all_job_ids): - # Give an update every 1000 jobs or 10 seconds - if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or - idx == (jobs_count - 1)): - logging.info("Job queue inspection: %d/%d (%0.1f %%)", - idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) - lastinfo = time.time() - - job = self._LoadJobUnlocked(job_id) - - # a failure in loading the job can cause 'None' to be returned - if job is None: - continue - status = job.CalcStatus() + job = self._LoadJobUnlocked(job_id) - if status in (constants.JOB_STATUS_QUEUED, ): - self._wpool.AddTask((job, )) + # a failure in loading the job can cause 'None' to be returned + if job is None: + continue - elif status in (constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK, - constants.JOB_STATUS_CANCELING): - logging.warning("Unfinished job %s found: %s", job.id, job) - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - "Unclean master daemon shutdown") - self.UpdateJobUnlocked(job) + status = job.CalcStatus() - logging.info("Job queue inspection finished") - finally: - self.release() - except: - self._wpool.TerminateWorkers() - raise + if status in (constants.JOB_STATUS_QUEUED, ): + self._wpool.AddTask((job, )) + + elif status in (constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK, + constants.JOB_STATUS_CANCELING): + logging.warning("Unfinished job %s found: %s", job.id, job) + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") + self.UpdateJobUnlocked(job) + + logging.info("Job queue inspection finished") @locking.ssynchronized(_LOCK) @_RequireOpenQueue -- 1.7.10.4