Cache a few bits of status in jqueue
authorGuido Trotter <ultrotter@google.com>
Fri, 11 Jun 2010 11:25:59 +0000 (12:25 +0100)
committerGuido Trotter <ultrotter@google.com>
Fri, 11 Jun 2010 16:06:33 +0000 (17:06 +0100)
Currently each time we submit a job we check the job queue size, and the
drained file. With this change we keep these pieces of information in
memory and don't read them from the filesystem each time.

Significant changes include:
  - The drained value can only be properly set by calling the
    appropriate cluster command "gnt-cluster queue drain/undrain" and
    not by removing/creating the file in the job queue directory. Not
    that anybody would have done it in this undocumented way before.
  - We get rid of the soft limit for the job queue, which we haven't
    ever used anyway.

Signed-off-by: Guido Trotter <ultrotter@google.com>
Reviewed-by: Michael Hanselmann <hansmi@google.com>

lib/constants.py
lib/jqueue.py

index 94eac9d..15a4257 100644 (file)
@@ -695,7 +695,6 @@ JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial"
 JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive"
 JOB_QUEUE_DRAIN_FILE = QUEUE_DIR + "/drain"
 JOB_QUEUE_SIZE_HARD_LIMIT = 5000
-JOB_QUEUE_SIZE_SOFT_LIMIT = JOB_QUEUE_SIZE_HARD_LIMIT * 0.8
 JOB_QUEUE_DIRS = [QUEUE_DIR, JOB_QUEUE_ARCHIVE_DIR]
 JOB_QUEUE_DIRS_MODE = SECURE_DIR_MODE
 
index 306fdcf..d6b20ea 100644 (file)
@@ -628,6 +628,10 @@ class JobQueue(object):
 
     # TODO: Check consistency across nodes
 
+    self._queue_size = 0
+    self._UpdateQueueSizeUnlocked()
+    self._drained = self._IsQueueMarkedDrain()
+
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
     try:
@@ -997,8 +1001,15 @@ class JobQueue(object):
     """
     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
 
-  @staticmethod
-  def SetDrainFlag(drain_flag):
+  def _UpdateQueueSizeUnlocked(self):
+    """Update the queue size.
+
+    """
+    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SetDrainFlag(self, drain_flag):
     """Sets the drain flag for the queue.
 
     @type drain_flag: boolean
@@ -1009,6 +1020,9 @@ class JobQueue(object):
       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
     else:
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+
+    self._drained = drain_flag
+
     return True
 
   @_RequireOpenQueue
@@ -1027,18 +1041,12 @@ class JobQueue(object):
     @raise errors.JobQueueDrainError: if the job is marked for draining
 
     """
-    if self._IsQueueMarkedDrain():
+    # Ok when sharing the big job queue lock, as the drain file is created when
+    # the lock is exclusive.
+    if self._drained:
       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
-    # Check job queue size
-    size = len(self._GetJobIDsUnlocked(sort=False))
-    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
-      # TODO: Autoarchive jobs. Make sure it's not done on every job
-      # submission, though.
-      #size = ...
-      pass
-
-    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
     job = _QueuedJob(self, job_id, ops)
@@ -1046,6 +1054,8 @@ class JobQueue(object):
     # Write to disk
     self.UpdateJobUnlocked(job)
 
+    self._queue_size += 1
+
     logging.debug("Adding new job %s to the cache", job_id)
     self._memcache[job_id] = job
 
@@ -1250,6 +1260,11 @@ class JobQueue(object):
     logging.debug("Successfully archived job(s) %s",
                   utils.CommaJoin(job.id for job in archive_jobs))
 
+    # Since we haven't quite checked, above, if we succeeded or failed renaming
+    # the files, we update the cached queue size from the filesystem. When we
+    # get around to fix the TODO: above, we can use the number of actually
+    # archived jobs to fix this.
+    self._UpdateQueueSizeUnlocked()
     return len(archive_jobs)
 
   @utils.LockedMethod