Add job queue size limit
authorMichael Hanselmann <hansmi@google.com>
Wed, 17 Dec 2008 13:18:35 +0000 (13:18 +0000)
committerMichael Hanselmann <hansmi@google.com>
Wed, 17 Dec 2008 13:18:35 +0000 (13:18 +0000)
A job queue with too many jobs can increase memory usage and/or make
the master daemon slow. The current limit is just an arbitrary number.
A "soft" limit for automatic job archival is prepared.

Reviewed-by: iustinp

lib/cli.py
lib/constants.py
lib/errors.py
lib/jqueue.py

index 737c401..dea7078 100644 (file)
@@ -687,6 +687,9 @@ def FormatError(err):
   elif isinstance(err, errors.JobQueueDrainError):
     obuf.write("Failure: the job queue is marked for drain and doesn't"
                " accept new requests\n")
+  elif isinstance(err, errors.JobQueueFull):
+    obuf.write("Failure: the job queue is full and doesn't accept new"
+               " job submissions until old jobs are archived\n")
   elif isinstance(err, errors.GenericError):
     obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, luxi.NoMasterError):
index a5748c9..7dfa2ac 100644 (file)
@@ -366,6 +366,8 @@ JOB_QUEUE_VERSION_FILE = QUEUE_DIR + "/version"
 JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial"
 JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive"
 JOB_QUEUE_DRAIN_FILE = QUEUE_DIR + "/drain"
+JOB_QUEUE_SIZE_HARD_LIMIT = 5000
+JOB_QUEUE_SIZE_SOFT_LIMIT = JOB_QUEUE_SIZE_HARD_LIMIT * 0.8
 
 JOB_ID_TEMPLATE = r"\d+"
 
index 4f3d86d..b54a149 100644 (file)
@@ -253,6 +253,14 @@ class JobQueueDrainError(JobQueueError):
   """
 
 
+class JobQueueFull(JobQueueError):
+  """Job queue full error.
+
+  Raised when job queue size reached its hard limit.
+
+  """
+
+
 # errors should be added above
 
 
index 38b13b7..3734476 100644 (file)
@@ -941,6 +941,18 @@ class JobQueue(object):
     """
     if self._IsQueueMarkedDrain():
       raise errors.JobQueueDrainError()
+
+    # Check job queue size
+    size = len(self._ListJobFiles())
+    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
+      # TODO: Autoarchive jobs. Make sure it's not done on every job
+      # submission, though.
+      #size = ...
+      pass
+
+    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+      raise errors.JobQueueFull()
+
     # Get job identifier
     job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)