- count = len(job.ops)
- for idx, op in enumerate(job.ops):
- op_summary = op.input.Summary()
- if op.status == constants.OP_STATUS_SUCCESS:
- # this is a job that was partially completed before master
- # daemon shutdown, so it can be expected that some opcodes
- # are already completed successfully (if any did error
- # out, then the whole job should have been aborted and not
- # resubmitted for processing)
- logging.info("Op %s/%s: opcode %s already processed, skipping",
- idx + 1, count, op_summary)
- continue
- try:
- logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
- op_summary)
-
- queue.acquire(shared=1)
- try:
- if op.status == constants.OP_STATUS_CANCELED:
- logging.debug("Canceling opcode")
- raise CancelJob()
- assert op.status == constants.OP_STATUS_QUEUED
- logging.debug("Opcode %s/%s waiting for locks",
- idx + 1, count)
- op.status = constants.OP_STATUS_WAITLOCK
- op.result = None
- op.start_timestamp = TimeStampNow()
- if idx == 0: # first opcode
- job.start_timestamp = op.start_timestamp
- queue.UpdateJobUnlocked(job)
-
- input_opcode = op.input
- finally:
- queue.release()
-
- # Make sure not to hold queue lock while calling ExecOpCode
- result = proc.ExecOpCode(input_opcode,
- _OpExecCallbacks(queue, job, op))
-
- queue.acquire(shared=1)
- try:
- logging.debug("Opcode %s/%s succeeded", idx + 1, count)
- op.status = constants.OP_STATUS_SUCCESS
- op.result = result
- op.end_timestamp = TimeStampNow()
- if idx == count - 1:
- job.lock_status = None
- job.end_timestamp = TimeStampNow()
- queue.UpdateJobUnlocked(job)
- finally:
- queue.release()
-
- logging.info("Op %s/%s: Successfully finished opcode %s",
- idx + 1, count, op_summary)
- except CancelJob:
- # Will be handled further up
- raise
- except Exception, err:
- queue.acquire(shared=1)
- try:
- try:
- logging.debug("Opcode %s/%s failed", idx + 1, count)
- op.status = constants.OP_STATUS_ERROR
- if isinstance(err, errors.GenericError):
- to_encode = err
- else:
- to_encode = errors.OpExecError(str(err))
- op.result = errors.EncodeException(to_encode)
- op.end_timestamp = TimeStampNow()
- logging.info("Op %s/%s: Error in opcode %s: %s",
- idx + 1, count, op_summary, err)
- finally:
- job.lock_status = None
- job.end_timestamp = TimeStampNow()
- queue.UpdateJobUnlocked(job)
- finally:
- queue.release()
- raise
-
- except CancelJob:
- queue.acquire(shared=1)
+ (idx, op) = job.ops_iter.next()
+ except StopIteration:
+ raise errors.ProgrammerError("Called for a finished job")
+
+ if op.status == constants.OP_STATUS_RUNNING:
+ # Found an opcode already marked as running
+ raise errors.ProgrammerError("Called for job marked as running")
+
+ opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
+ timeout_strategy_factory)
+
+ if op.status == constants.OP_STATUS_CANCELED:
+ # Cancelled jobs are handled by the caller
+ assert not compat.any(i.status != constants.OP_STATUS_CANCELED
+ for i in job.ops[idx:])
+
+ elif op.status in constants.OPS_FINALIZED:
+ # This is a job that was partially completed before master daemon
+ # shutdown, so it can be expected that some opcodes are already
+ # completed successfully (if any did error out, then the whole job
+ # should have been aborted and not resubmitted for processing).
+ logging.info("%s: opcode %s already processed, skipping",
+ opctx.log_prefix, opctx.summary)
+ continue
+
+ return opctx
+
+ @staticmethod
+ def _MarkWaitlock(job, op):
+ """Marks an opcode as waiting for locks.
+
+ The job's start timestamp is also set if necessary.
+
+ @type job: L{_QueuedJob}
+ @param job: Job object
+ @type op: L{_QueuedOpCode}
+ @param op: Opcode object
+
+ """
+ assert op in job.ops
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK)
+
+ update = False
+
+ op.result = None
+
+ if op.status == constants.OP_STATUS_QUEUED:
+ op.status = constants.OP_STATUS_WAITLOCK
+ update = True
+
+ if op.start_timestamp is None:
+ op.start_timestamp = TimeStampNow()
+ update = True
+
+ if job.start_timestamp is None:
+ job.start_timestamp = op.start_timestamp
+ update = True
+
+ assert op.status == constants.OP_STATUS_WAITLOCK
+
+ return update
+
+ def _ExecOpCodeUnlocked(self, opctx):
+ """Processes one opcode and returns the result.
+
+ """
+ op = opctx.op
+
+ assert op.status == constants.OP_STATUS_WAITLOCK
+
+ timeout = opctx.GetNextLockTimeout()
+
+ try:
+ # Make sure not to hold queue lock while calling ExecOpCode
+ result = self.opexec_fn(op.input,
+ _OpExecCallbacks(self.queue, self.job, op),
+ timeout=timeout, priority=op.priority)
+ except mcpu.LockAcquireTimeout:
+ assert timeout is not None, "Received timeout for blocking acquire"
+ logging.debug("Couldn't acquire locks in %0.6fs", timeout)
+
+ assert op.status in (constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING)
+
+ # Was job cancelled while we were waiting for the lock?
+ if op.status == constants.OP_STATUS_CANCELING:
+ return (constants.OP_STATUS_CANCELING, None)
+
+ # Stay in waitlock while trying to re-acquire lock
+ return (constants.OP_STATUS_WAITLOCK, None)
+ except CancelJob:
+ logging.exception("%s: Canceling job", opctx.log_prefix)
+ assert op.status == constants.OP_STATUS_CANCELING
+ return (constants.OP_STATUS_CANCELING, None)
+ except Exception, err: # pylint: disable-msg=W0703
+ logging.exception("%s: Caught exception in %s",
+ opctx.log_prefix, opctx.summary)
+ return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
+ else:
+ logging.debug("%s: %s successful",
+ opctx.log_prefix, opctx.summary)
+ return (constants.OP_STATUS_SUCCESS, result)
+
+ def __call__(self, _nextop_fn=None):
+ """Continues execution of a job.
+
+ @param _nextop_fn: Callback function for tests
+ @rtype: bool
+ @return: True if job is finished, False if processor needs to be called
+ again
+
+ """
+ queue = self.queue
+ job = self.job
+
+ logging.debug("Processing job %s", job.id)
+
+ queue.acquire(shared=1)
+ try:
+ opcount = len(job.ops)
+
+ # Is a previous opcode still pending?
+ if job.cur_opctx:
+ opctx = job.cur_opctx
+ job.cur_opctx = None
+ else:
+ if __debug__ and _nextop_fn:
+ _nextop_fn()
+ opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
+
+ op = opctx.op
+
+ # Consistency check
+ assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_CANCELING,
+ constants.OP_STATUS_CANCELED)
+ for i in job.ops[opctx.index + 1:])
+
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING,
+ constants.OP_STATUS_CANCELED)
+
+ assert (op.priority <= constants.OP_PRIO_LOWEST and
+ op.priority >= constants.OP_PRIO_HIGHEST)
+
+ if op.status not in (constants.OP_STATUS_CANCELING,
+ constants.OP_STATUS_CANCELED):
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK)
+
+ # Prepare to start opcode
+ if self._MarkWaitlock(job, op):
+ # Write to disk
+ queue.UpdateJobUnlocked(job)
+
+ assert op.status == constants.OP_STATUS_WAITLOCK
+ assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+ assert job.start_timestamp and op.start_timestamp
+
+ logging.info("%s: opcode %s waiting for locks",
+ opctx.log_prefix, opctx.summary)
+
+ queue.release()