X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/6e237482804931b883790a2e6e439565ec999056..fc8a6b8f3334c1352caafcede1e31d801fc146ba:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index e06c5f8..8657fed 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -460,7 +460,10 @@ class _JobQueueWorker(workerpool.BaseWorker): try: try: op.status = constants.OP_STATUS_ERROR - op.result = str(err) + if isinstance(err, errors.GenericError): + op.result = errors.EncodeException(err) + else: + op.result = str(err) op.end_timestamp = TimeStampNow() logging.info("Op %s/%s: Error in opcode %s: %s", idx + 1, count, op_summary, err) @@ -506,33 +509,37 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): self.queue = queue -class JobQueue(object): - """Queue used to manage the jobs. +def _RequireOpenQueue(fn): + """Decorator for "public" functions. - @cvar _RE_JOB_FILE: regex matching the valid job file names + This function should be used for all 'public' functions. That is, + functions usually called from other classes. Note that this should + be applied only to methods (not plain functions), since it expects + that the decorated function is called with a first argument that has + a '_queue_lock' argument. - """ - _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) + @warning: Use this decorator only after utils.LockedMethod! - def _RequireOpenQueue(fn): - """Decorator for "public" functions. + Example:: + @utils.LockedMethod + @_RequireOpenQueue + def Example(self): + pass - This function should be used for all 'public' functions. That is, - functions usually called from other classes. + """ + def wrapper(self, *args, **kwargs): + assert self._queue_lock is not None, "Queue should be open" + return fn(self, *args, **kwargs) + return wrapper - @warning: Use this decorator only after utils.LockedMethod! - Example:: - @utils.LockedMethod - @_RequireOpenQueue - def Example(self): - pass +class JobQueue(object): + """Queue used to manage the jobs. - """ - def wrapper(self, *args, **kwargs): - assert self._queue_lock is not None, "Queue should be open" - return fn(self, *args, **kwargs) - return wrapper + @cvar _RE_JOB_FILE: regex matching the valid job file names + + """ + _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) def __init__(self, context): """Constructor for JobQueue. @@ -796,26 +803,31 @@ class JobQueue(object): """ return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) - def _NewSerialUnlocked(self): + def _NewSerialsUnlocked(self, count): """Generates a new job identifier. Job identifiers are unique during the lifetime of a cluster. + @type count: integer + @param count: how many serials to return @rtype: str @return: a string representing the job identifier. """ + assert count > 0 # New number - serial = self._last_serial + 1 + serial = self._last_serial + count # Write to file self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, "%s\n" % serial) + result = [self._FormatJobID(v) + for v in range(self._last_serial, serial + 1)] # Keep it only if we were able to write the file self._last_serial = serial - return self._FormatJobID(serial) + return result @staticmethod def _GetJobPath(job_id): @@ -981,12 +993,14 @@ class JobQueue(object): return True @_RequireOpenQueue - def _SubmitJobUnlocked(self, ops): + def _SubmitJobUnlocked(self, job_id, ops): """Create and store a new job. This enters the job into our job queue and also puts it on the new queue, in order for it to be picked up by the queue processors. + @type job_id: job ID + @param jod_id: the job ID for the new job @type ops: list @param ops: The list of OpCodes that will become the new job. @rtype: job ID @@ -1008,8 +1022,6 @@ class JobQueue(object): if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() - # Get job identifier - job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops) # Write to disk @@ -1031,7 +1043,8 @@ class JobQueue(object): @see: L{_SubmitJobUnlocked} """ - return self._SubmitJobUnlocked(ops) + job_id = self._NewSerialsUnlocked(1)[0] + return self._SubmitJobUnlocked(job_id, ops) @utils.LockedMethod @_RequireOpenQueue @@ -1042,9 +1055,10 @@ class JobQueue(object): """ results = [] - for ops in jobs: + all_job_ids = self._NewSerialsUnlocked(len(jobs)) + for job_id, ops in zip(all_job_ids, jobs): try: - data = self._SubmitJobUnlocked(ops) + data = self._SubmitJobUnlocked(job_id, ops) status = True except errors.GenericError, err: data = str(err)