try:
try:
op.status = constants.OP_STATUS_ERROR
- op.result = str(err)
+ if isinstance(err, errors.GenericError):
+ op.result = errors.EncodeException(err)
+ else:
+ op.result = str(err)
op.end_timestamp = TimeStampNow()
logging.info("Op %s/%s: Error in opcode %s: %s",
idx + 1, count, op_summary, err)
self.queue = queue
-class JobQueue(object):
- """Queue used to manage the jobs.
+def _RequireOpenQueue(fn):
+ """Decorator for "public" functions.
- @cvar _RE_JOB_FILE: regex matching the valid job file names
+ This function should be used for all 'public' functions. That is,
+ functions usually called from other classes. Note that this should
+ be applied only to methods (not plain functions), since it expects
+ that the decorated function is called with a first argument that has
+ a '_queue_lock' argument.
- """
- _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+ @warning: Use this decorator only after utils.LockedMethod!
- def _RequireOpenQueue(fn):
- """Decorator for "public" functions.
+ Example::
+ @utils.LockedMethod
+ @_RequireOpenQueue
+ def Example(self):
+ pass
- This function should be used for all 'public' functions. That is,
- functions usually called from other classes.
+ """
+ def wrapper(self, *args, **kwargs):
+ assert self._queue_lock is not None, "Queue should be open"
+ return fn(self, *args, **kwargs)
+ return wrapper
- @warning: Use this decorator only after utils.LockedMethod!
- Example::
- @utils.LockedMethod
- @_RequireOpenQueue
- def Example(self):
- pass
+class JobQueue(object):
+ """Queue used to manage the jobs.
- """
- def wrapper(self, *args, **kwargs):
- assert self._queue_lock is not None, "Queue should be open"
- return fn(self, *args, **kwargs)
- return wrapper
+ @cvar _RE_JOB_FILE: regex matching the valid job file names
+
+ """
+ _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
def __init__(self, context):
"""Constructor for JobQueue.
"""
return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
- def _NewSerialUnlocked(self):
+ def _NewSerialsUnlocked(self, count):
"""Generates a new job identifier.
Job identifiers are unique during the lifetime of a cluster.
+ @type count: integer
+ @param count: how many serials to return
@rtype: str
@return: a string representing the job identifier.
"""
+ assert count > 0
# New number
- serial = self._last_serial + 1
+ serial = self._last_serial + count
# Write to file
self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
"%s\n" % serial)
+ result = [self._FormatJobID(v)
+ for v in range(self._last_serial, serial + 1)]
# Keep it only if we were able to write the file
self._last_serial = serial
- return self._FormatJobID(serial)
+ return result
@staticmethod
def _GetJobPath(job_id):
return True
@_RequireOpenQueue
- def _SubmitJobUnlocked(self, ops):
+ def _SubmitJobUnlocked(self, job_id, ops):
"""Create and store a new job.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
+ @type job_id: job ID
+ @param jod_id: the job ID for the new job
@type ops: list
@param ops: The list of OpCodes that will become the new job.
@rtype: job ID
if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
raise errors.JobQueueFull()
- # Get job identifier
- job_id = self._NewSerialUnlocked()
job = _QueuedJob(self, job_id, ops)
# Write to disk
@see: L{_SubmitJobUnlocked}
"""
- return self._SubmitJobUnlocked(ops)
+ job_id = self._NewSerialsUnlocked(1)[0]
+ return self._SubmitJobUnlocked(job_id, ops)
@utils.LockedMethod
@_RequireOpenQueue
"""
results = []
- for ops in jobs:
+ all_job_ids = self._NewSerialsUnlocked(len(jobs))
+ for job_id, ops in zip(all_job_ids, jobs):
try:
- data = self._SubmitJobUnlocked(ops)
+ data = self._SubmitJobUnlocked(job_id, ops)
status = True
except errors.GenericError, err:
data = str(err)