From 009e73d0ce85a5cc2070f4b9caf2ff5244cc4af5 Mon Sep 17 00:00:00 2001 From: Iustin Pop Date: Mon, 7 Sep 2009 15:02:07 +0200 Subject: [PATCH] Optimise multi-job submit MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Currently, on multi-job submits we simply iterate over the single-job-submit function. This means we grab a new serial, write and replicate (and wait for the remote nodes to ack) the serial file, and only then create the job file; this is repeated N times, once for each job. Since job identifiers are ‘cheap’, it's simpler to simply grab at the start a block of new IDs, write and replicate the serial count file a single time, and then proceed with the jobs as before. This is a cheap change that reduces I/O and reduces slightly the CPU consumption of the master daemon: submit time seems to be cut in half for big batches of jobs and the masterd cpu time by (I can't get consistent numbers) between 15%-50%. Note that this doesn't change anything for single-job submits and most probably for < 5 job submits either. Signed-off-by: Iustin Pop Reviewed-by: Michael Hanselmann --- lib/jqueue.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index e06c5f8..9219cae 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -796,26 +796,31 @@ class JobQueue(object): """ return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) - def _NewSerialUnlocked(self): + def _NewSerialsUnlocked(self, count): """Generates a new job identifier. Job identifiers are unique during the lifetime of a cluster. + @type count: integer + @param count: how many serials to return @rtype: str @return: a string representing the job identifier. """ + assert count > 0 # New number - serial = self._last_serial + 1 + serial = self._last_serial + count # Write to file self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, "%s\n" % serial) + result = [self._FormatJobID(v) + for v in range(self._last_serial, serial + 1)] # Keep it only if we were able to write the file self._last_serial = serial - return self._FormatJobID(serial) + return result @staticmethod def _GetJobPath(job_id): @@ -981,12 +986,14 @@ class JobQueue(object): return True @_RequireOpenQueue - def _SubmitJobUnlocked(self, ops): + def _SubmitJobUnlocked(self, job_id, ops): """Create and store a new job. This enters the job into our job queue and also puts it on the new queue, in order for it to be picked up by the queue processors. + @type job_id: job ID + @param jod_id: the job ID for the new job @type ops: list @param ops: The list of OpCodes that will become the new job. @rtype: job ID @@ -1008,8 +1015,6 @@ class JobQueue(object): if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() - # Get job identifier - job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops) # Write to disk @@ -1031,7 +1036,8 @@ class JobQueue(object): @see: L{_SubmitJobUnlocked} """ - return self._SubmitJobUnlocked(ops) + job_id = self._NewSerialsUnlocked(1)[0] + return self._SubmitJobUnlocked(job_id, ops) @utils.LockedMethod @_RequireOpenQueue @@ -1042,9 +1048,10 @@ class JobQueue(object): """ results = [] - for ops in jobs: + all_job_ids = self._NewSerialsUnlocked(len(jobs)) + for job_id, ops in zip(all_job_ids, jobs): try: - data = self._SubmitJobUnlocked(ops) + data = self._SubmitJobUnlocked(job_id, ops) status = True except errors.GenericError, err: data = str(err) -- 1.7.10.4