elif isinstance(err, errors.JobQueueDrainError):
obuf.write("Failure: the job queue is marked for drain and doesn't"
" accept new requests\n")
+ elif isinstance(err, errors.JobQueueFull):
+ obuf.write("Failure: the job queue is full and doesn't accept new"
+ " job submissions until old jobs are archived\n")
elif isinstance(err, errors.GenericError):
obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, luxi.NoMasterError):
JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial"
JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive"
JOB_QUEUE_DRAIN_FILE = QUEUE_DIR + "/drain"
+JOB_QUEUE_SIZE_HARD_LIMIT = 5000
+JOB_QUEUE_SIZE_SOFT_LIMIT = JOB_QUEUE_SIZE_HARD_LIMIT * 0.8
JOB_ID_TEMPLATE = r"\d+"
"""
+class JobQueueFull(JobQueueError):
+ """Job queue full error.
+
+ Raised when job queue size reached its hard limit.
+
+ """
+
+
# errors should be added above
"""
if self._IsQueueMarkedDrain():
raise errors.JobQueueDrainError()
+
+ # Check job queue size
+ size = len(self._ListJobFiles())
+ if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
+ # TODO: Autoarchive jobs. Make sure it's not done on every job
+ # submission, though.
+ #size = ...
+ pass
+
+ if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+ raise errors.JobQueueFull()
+
# Get job identifier
job_id = self._NewSerialUnlocked()
job = _QueuedJob(self, job_id, ops)