# TODO: Check consistency across nodes
+ self._queue_size = 0
+ self._UpdateQueueSizeUnlocked()
+ self._drained = self._IsQueueMarkedDrain()
+
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
try:
"""
return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
- @staticmethod
- def SetDrainFlag(drain_flag):
+ def _UpdateQueueSizeUnlocked(self):
+ """Update the queue size.
+
+ """
+ self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
+
+ @utils.LockedMethod
+ @_RequireOpenQueue
+ def SetDrainFlag(self, drain_flag):
"""Sets the drain flag for the queue.
@type drain_flag: boolean
utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
else:
utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+
+ self._drained = drain_flag
+
return True
@_RequireOpenQueue
@raise errors.JobQueueDrainError: if the job is marked for draining
"""
- if self._IsQueueMarkedDrain():
+ # Ok when sharing the big job queue lock, as the drain file is created when
+ # the lock is exclusive.
+ if self._drained:
raise errors.JobQueueDrainError("Job queue is drained, refusing job")
- # Check job queue size
- size = len(self._GetJobIDsUnlocked(sort=False))
- if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
- # TODO: Autoarchive jobs. Make sure it's not done on every job
- # submission, though.
- #size = ...
- pass
-
- if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+ if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
raise errors.JobQueueFull()
job = _QueuedJob(self, job_id, ops)
# Write to disk
self.UpdateJobUnlocked(job)
+ self._queue_size += 1
+
logging.debug("Adding new job %s to the cache", job_id)
self._memcache[job_id] = job
logging.debug("Successfully archived job(s) %s",
utils.CommaJoin(job.id for job in archive_jobs))
+ # Since we haven't quite checked, above, if we succeeded or failed renaming
+ # the files, we update the cached queue size from the filesystem. When we
+ # get around to fix the TODO: above, we can use the number of actually
+ # archived jobs to fix this.
+ self._UpdateQueueSizeUnlocked()
return len(archive_jobs)
@utils.LockedMethod