+ try:
+ count = len(job.ops)
+ for idx, op in enumerate(job.ops):
+ op_summary = op.input.Summary()
+ try:
+ logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
+ op_summary)
+
+ queue.acquire()
+ try:
+ if op.status == constants.OP_STATUS_CANCELED:
+ raise CancelJob()
+ assert op.status == constants.OP_STATUS_QUEUED
+ job.run_op_index = idx
+ op.status = constants.OP_STATUS_WAITLOCK
+ op.result = None
+ op.start_timestamp = TimeStampNow()
+ if idx == 0: # first opcode
+ job.start_timestamp = op.start_timestamp
+ queue.UpdateJobUnlocked(job)
+
+ input_opcode = op.input
+ finally:
+ queue.release()
+
+ def _Log(*args):
+ """Append a log entry.
+
+ """
+ assert len(args) < 3
+
+ if len(args) == 1:
+ log_type = constants.ELOG_MESSAGE
+ log_msg = args[0]
+ else:
+ log_type, log_msg = args
+
+ # The time is split to make serialization easier and not lose
+ # precision.
+ timestamp = utils.SplitTime(time.time())
+
+ queue.acquire()
+ try:
+ job.log_serial += 1
+ op.log.append((job.log_serial, timestamp, log_type, log_msg))
+
+ job.change.notifyAll()
+ finally:
+ queue.release()
+
+ # Make sure not to hold lock while _Log is called
+ self.opcode = op
+ result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
+
+ queue.acquire()
+ try:
+ op.status = constants.OP_STATUS_SUCCESS
+ op.result = result
+ op.end_timestamp = TimeStampNow()
+ queue.UpdateJobUnlocked(job)
+ finally:
+ queue.release()
+
+ logging.info("Op %s/%s: Successfully finished opcode %s",
+ idx + 1, count, op_summary)
+ except CancelJob:
+ # Will be handled further up
+ raise
+ except Exception, err:
+ queue.acquire()
+ try:
+ try:
+ op.status = constants.OP_STATUS_ERROR
+ op.result = str(err)
+ op.end_timestamp = TimeStampNow()
+ logging.info("Op %s/%s: Error in opcode %s: %s",
+ idx + 1, count, op_summary, err)
+ finally:
+ queue.UpdateJobUnlocked(job)
+ finally:
+ queue.release()
+ raise
+
+ except CancelJob:
+ queue.acquire()
+ try:
+ queue.CancelJobUnlocked(job)
+ finally:
+ queue.release()
+ except errors.GenericError, err:
+ logging.exception("Ganeti exception")
+ except:
+ logging.exception("Unhandled exception")