+ This is what we use to track the user-submitted jobs. Locking must
+ be taken care of by users of this class.
+
+ @type queue: L{JobQueue}
+ @ivar queue: the parent queue
+ @ivar id: the job ID
+ @type ops: list
+ @ivar ops: the list of _QueuedOpCode that constitute the job
+ @type run_op_index: int
+ @ivar run_op_index: the currently executing opcode, or -1 if
+ we didn't yet start executing
+ @type log_serial: int
+ @ivar log_serial: holds the index for the next log entry
+ @ivar received_timestamp: the timestamp for when the job was received
+ @ivar start_timestmap: the timestamp for start of execution
+ @ivar end_timestamp: the timestamp for end of execution
+ @ivar change: a Condition variable we use for waiting for job changes
+
+ """
+ def __init__(self, queue, job_id, ops):
+ """Constructor for the _QueuedJob.
+
+ @type queue: L{JobQueue}
+ @param queue: our parent queue
+ @type job_id: job_id
+ @param job_id: our job id
+ @type ops: list
+ @param ops: the list of opcodes we hold, which will be encapsulated
+ in _QueuedOpCodes
+
+ """
+ if not ops:
+ # TODO: use a better exception
+ raise Exception("No opcodes")
+
+ self.queue = queue
+ self.id = job_id
+ self.ops = [_QueuedOpCode(op) for op in ops]
+ self.run_op_index = -1
+ self.log_serial = 0
+ self.received_timestamp = TimeStampNow()
+ self.start_timestamp = None
+ self.end_timestamp = None
+
+ # Condition to wait for changes
+ self.change = threading.Condition(self.queue._lock)
+
+ @classmethod
+ def Restore(cls, queue, state):
+ """Restore a _QueuedJob from serialized state:
+
+ @type queue: L{JobQueue}
+ @param queue: to which queue the restored job belongs
+ @type state: dict
+ @param state: the serialized state
+ @rtype: _JobQueue
+ @return: the restored _JobQueue instance
+
+ """
+ obj = _QueuedJob.__new__(cls)
+ obj.queue = queue
+ obj.id = state["id"]
+ obj.run_op_index = state["run_op_index"]
+ obj.received_timestamp = state.get("received_timestamp", None)
+ obj.start_timestamp = state.get("start_timestamp", None)
+ obj.end_timestamp = state.get("end_timestamp", None)
+
+ obj.ops = []
+ obj.log_serial = 0
+ for op_state in state["ops"]:
+ op = _QueuedOpCode.Restore(op_state)
+ for log_entry in op.log:
+ obj.log_serial = max(obj.log_serial, log_entry[0])
+ obj.ops.append(op)
+
+ # Condition to wait for changes
+ obj.change = threading.Condition(obj.queue._lock)
+
+ return obj
+
+ def Serialize(self):
+ """Serialize the _JobQueue instance.
+
+ @rtype: dict
+ @return: the serialized state
+
+ """
+ return {
+ "id": self.id,
+ "ops": [op.Serialize() for op in self.ops],
+ "run_op_index": self.run_op_index,
+ "start_timestamp": self.start_timestamp,
+ "end_timestamp": self.end_timestamp,
+ "received_timestamp": self.received_timestamp,
+ }
+
+ def CalcStatus(self):
+ """Compute the status of this job.
+
+ This function iterates over all the _QueuedOpCodes in the job and
+ based on their status, computes the job status.
+
+ The algorithm is:
+ - if we find a cancelled, or finished with error, the job
+ status will be the same
+ - otherwise, the last opcode with the status one of:
+ - waitlock
+ - canceling
+ - running
+
+ will determine the job status
+
+ - otherwise, it means either all opcodes are queued, or success,
+ and the job status will be the same
+
+ @return: the job status
+
+ """
+ status = constants.JOB_STATUS_QUEUED
+
+ all_success = True
+ for op in self.ops:
+ if op.status == constants.OP_STATUS_SUCCESS:
+ continue
+
+ all_success = False
+
+ if op.status == constants.OP_STATUS_QUEUED:
+ pass
+ elif op.status == constants.OP_STATUS_WAITLOCK:
+ status = constants.JOB_STATUS_WAITLOCK
+ elif op.status == constants.OP_STATUS_RUNNING:
+ status = constants.JOB_STATUS_RUNNING
+ elif op.status == constants.OP_STATUS_CANCELING:
+ status = constants.JOB_STATUS_CANCELING
+ break
+ elif op.status == constants.OP_STATUS_ERROR:
+ status = constants.JOB_STATUS_ERROR
+ # The whole job fails if one opcode failed
+ break
+ elif op.status == constants.OP_STATUS_CANCELED:
+ status = constants.OP_STATUS_CANCELED
+ break
+
+ if all_success:
+ status = constants.JOB_STATUS_SUCCESS
+
+ return status
+
+ def GetLogEntries(self, newer_than):
+ """Selectively returns the log entries.
+
+ @type newer_than: None or int
+ @param newer_than: if this is None, return all log enties,
+ otherwise return only the log entries with serial higher
+ than this value
+ @rtype: list
+ @return: the list of the log entries selected
+
+ """
+ if newer_than is None:
+ serial = -1
+ else:
+ serial = newer_than
+
+ entries = []
+ for op in self.ops:
+ entries.extend(filter(lambda entry: entry[0] > serial, op.log))
+
+ return entries
+
+
+class _JobQueueWorker(workerpool.BaseWorker):
+ """The actual job workers.
+
+ """
+ def _NotifyStart(self):
+ """Mark the opcode as running, not lock-waiting.
+
+ This is called from the mcpu code as a notifier function, when the
+ LU is finally about to start the Exec() method. Of course, to have
+ end-user visible results, the opcode must be initially (before
+ calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+
+ """
+ assert self.queue, "Queue attribute is missing"
+ assert self.opcode, "Opcode attribute is missing"
+
+ self.queue.acquire()
+ try:
+ assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING)
+
+ # Cancel here if we were asked to
+ if self.opcode.status == constants.OP_STATUS_CANCELING:
+ raise CancelJob()
+
+ self.opcode.status = constants.OP_STATUS_RUNNING
+ finally:
+ self.queue.release()
+
+ def RunTask(self, job):
+ """Job executor.
+
+ This functions processes a job. It is closely tied to the _QueuedJob and
+ _QueuedOpCode classes.
+
+ @type job: L{_QueuedJob}
+ @param job: the job to be processed
+
+ """
+ logging.info("Worker %s processing job %s",
+ self.worker_id, job.id)
+ proc = mcpu.Processor(self.pool.queue.context)
+ self.queue = queue = job.queue
+ try:
+ try:
+ count = len(job.ops)
+ for idx, op in enumerate(job.ops):
+ op_summary = op.input.Summary()
+ try:
+ logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
+ op_summary)
+
+ queue.acquire()
+ try:
+ if op.status == constants.OP_STATUS_CANCELED:
+ raise CancelJob()
+ assert op.status == constants.OP_STATUS_QUEUED
+ job.run_op_index = idx
+ op.status = constants.OP_STATUS_WAITLOCK
+ op.result = None
+ op.start_timestamp = TimeStampNow()
+ if idx == 0: # first opcode
+ job.start_timestamp = op.start_timestamp
+ queue.UpdateJobUnlocked(job)
+
+ input_opcode = op.input
+ finally:
+ queue.release()
+
+ def _Log(*args):
+ """Append a log entry.
+
+ """
+ assert len(args) < 3
+
+ if len(args) == 1:
+ log_type = constants.ELOG_MESSAGE
+ log_msg = args[0]
+ else:
+ log_type, log_msg = args
+
+ # The time is split to make serialization easier and not lose
+ # precision.
+ timestamp = utils.SplitTime(time.time())
+
+ queue.acquire()
+ try:
+ job.log_serial += 1
+ op.log.append((job.log_serial, timestamp, log_type, log_msg))
+
+ job.change.notifyAll()
+ finally:
+ queue.release()
+
+ # Make sure not to hold lock while _Log is called
+ self.opcode = op
+ result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
+
+ queue.acquire()
+ try:
+ op.status = constants.OP_STATUS_SUCCESS
+ op.result = result
+ op.end_timestamp = TimeStampNow()
+ queue.UpdateJobUnlocked(job)
+ finally:
+ queue.release()
+
+ logging.info("Op %s/%s: Successfully finished opcode %s",
+ idx + 1, count, op_summary)
+ except CancelJob:
+ # Will be handled further up
+ raise
+ except Exception, err:
+ queue.acquire()
+ try:
+ try:
+ op.status = constants.OP_STATUS_ERROR
+ op.result = str(err)
+ op.end_timestamp = TimeStampNow()
+ logging.info("Op %s/%s: Error in opcode %s: %s",
+ idx + 1, count, op_summary, err)
+ finally:
+ queue.UpdateJobUnlocked(job)
+ finally:
+ queue.release()
+ raise
+
+ except CancelJob:
+ queue.acquire()
+ try:
+ queue.CancelJobUnlocked(job)
+ finally:
+ queue.release()
+ except errors.GenericError, err:
+ logging.exception("Ganeti exception")
+ except:
+ logging.exception("Unhandled exception")
+ finally:
+ queue.acquire()
+ try:
+ try:
+ job.run_op_idx = -1
+ job.end_timestamp = TimeStampNow()
+ queue.UpdateJobUnlocked(job)
+ finally:
+ job_id = job.id
+ status = job.CalcStatus()
+ finally:
+ queue.release()
+ logging.info("Worker %s finished job %s, status = %s",
+ self.worker_id, job_id, status)
+
+
+class _JobQueueWorkerPool(workerpool.WorkerPool):
+ """Simple class implementing a job-processing workerpool.