"""
return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
- def _NewSerialUnlocked(self):
+ def _NewSerialsUnlocked(self, count):
"""Generates a new job identifier.
Job identifiers are unique during the lifetime of a cluster.
+ @type count: integer
+ @param count: how many serials to return
@rtype: str
@return: a string representing the job identifier.
"""
+ assert count > 0
# New number
- serial = self._last_serial + 1
+ serial = self._last_serial + count
# Write to file
self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
"%s\n" % serial)
+ result = [self._FormatJobID(v)
+ for v in range(self._last_serial, serial + 1)]
# Keep it only if we were able to write the file
self._last_serial = serial
- return self._FormatJobID(serial)
+ return result
@staticmethod
def _GetJobPath(job_id):
return True
@_RequireOpenQueue
- def _SubmitJobUnlocked(self, ops):
+ def _SubmitJobUnlocked(self, job_id, ops):
"""Create and store a new job.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
+ @type job_id: job ID
+ @param jod_id: the job ID for the new job
@type ops: list
@param ops: The list of OpCodes that will become the new job.
@rtype: job ID
if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
raise errors.JobQueueFull()
- # Get job identifier
- job_id = self._NewSerialUnlocked()
job = _QueuedJob(self, job_id, ops)
# Write to disk
@see: L{_SubmitJobUnlocked}
"""
- return self._SubmitJobUnlocked(ops)
+ job_id = self._NewSerialsUnlocked(1)[0]
+ return self._SubmitJobUnlocked(job_id, ops)
@utils.LockedMethod
@_RequireOpenQueue
"""
results = []
- for ops in jobs:
+ all_job_ids = self._NewSerialsUnlocked(len(jobs))
+ for job_id, ops in zip(all_job_ids, jobs):
try:
- data = self._SubmitJobUnlocked(ops)
+ data = self._SubmitJobUnlocked(job_id, ops)
status = True
except errors.GenericError, err:
data = str(err)