Optimise multi-job submit
authorIustin Pop <iustin@google.com>
Mon, 7 Sep 2009 13:02:07 +0000 (15:02 +0200)
committerIustin Pop <iustin@google.com>
Mon, 7 Sep 2009 13:38:30 +0000 (15:38 +0200)
Currently, on multi-job submits we simply iterate over the
single-job-submit function. This means we grab a new serial, write and
replicate (and wait for the remote nodes to ack) the serial file, and
only then create the job file; this is repeated N times, once for each
job.

Since job identifiers are ‘cheap’, it's simpler to simply grab at the
start a block of new IDs, write and replicate the serial count file a
single time, and then proceed with the jobs as before. This is a cheap
change that reduces I/O and reduces slightly the CPU consumption of the
master daemon: submit time seems to be cut in half for big batches of
jobs and the masterd cpu time by (I can't get consistent numbers)
between 15%-50%.

Note that this doesn't change anything for single-job submits and most
probably for < 5 job submits either.

Signed-off-by: Iustin Pop <iustin@google.com>
Reviewed-by: Michael Hanselmann <hansmi@google.com>

lib/jqueue.py

index e06c5f8..9219cae 100644 (file)
@@ -796,26 +796,31 @@ class JobQueue(object):
     """
     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
 
-  def _NewSerialUnlocked(self):
+  def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
+    @type count: integer
+    @param count: how many serials to return
     @rtype: str
     @return: a string representing the job identifier.
 
     """
+    assert count > 0
     # New number
-    serial = self._last_serial + 1
+    serial = self._last_serial + count
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
+    result = [self._FormatJobID(v)
+              for v in range(self._last_serial, serial + 1)]
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
-    return self._FormatJobID(serial)
+    return result
 
   @staticmethod
   def _GetJobPath(job_id):
@@ -981,12 +986,14 @@ class JobQueue(object):
     return True
 
   @_RequireOpenQueue
-  def _SubmitJobUnlocked(self, ops):
+  def _SubmitJobUnlocked(self, job_id, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
+    @type job_id: job ID
+    @param jod_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
@@ -1008,8 +1015,6 @@ class JobQueue(object):
     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
-    # Get job identifier
-    job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
@@ -1031,7 +1036,8 @@ class JobQueue(object):
     @see: L{_SubmitJobUnlocked}
 
     """
-    return self._SubmitJobUnlocked(ops)
+    job_id = self._NewSerialsUnlocked(1)[0]
+    return self._SubmitJobUnlocked(job_id, ops)
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1042,9 +1048,10 @@ class JobQueue(object):
 
     """
     results = []
-    for ops in jobs:
+    all_job_ids = self._NewSerialsUnlocked(len(jobs))
+    for job_id, ops in zip(all_job_ids, jobs):
       try:
-        data = self._SubmitJobUnlocked(ops)
+        data = self._SubmitJobUnlocked(job_id, ops)
         status = True
       except errors.GenericError, err:
         data = str(err)