+ def GetLogEntries(self, newer_than):
+ """Selectively returns the log entries.
+
+ @type newer_than: None or int
+ @param newer_than: if this is None, return all log entries,
+ otherwise return only the log entries with serial higher
+ than this value
+ @rtype: list
+ @return: the list of the log entries selected
+
+ """
+ if newer_than is None:
+ serial = -1
+ else:
+ serial = newer_than
+
+ entries = []
+ for op in self.ops:
+ entries.extend(filter(lambda entry: entry[0] > serial, op.log))
+
+ return entries
+
+ def MarkUnfinishedOps(self, status, result):
+ """Mark unfinished opcodes with a given status and result.
+
+ This is an utility function for marking all running or waiting to
+ be run opcodes with a given status. Opcodes which are already
+ finalised are not changed.
+
+ @param status: a given opcode status
+ @param result: the opcode result
+
+ """
+ not_marked = True
+ for op in self.ops:
+ if op.status in constants.OPS_FINALIZED:
+ assert not_marked, "Finalized opcodes found after non-finalized ones"
+ continue
+ op.status = status
+ op.result = result
+ not_marked = False
+
+
+class _OpExecCallbacks(mcpu.OpExecCbBase):
+ def __init__(self, queue, job, op):
+ """Initializes this class.
+
+ @type queue: L{JobQueue}
+ @param queue: Job queue
+ @type job: L{_QueuedJob}
+ @param job: Job object
+ @type op: L{_QueuedOpCode}
+ @param op: OpCode
+
+ """
+ assert queue, "Queue is missing"
+ assert job, "Job is missing"
+ assert op, "Opcode is missing"
+
+ self._queue = queue
+ self._job = job
+ self._op = op
+
+ def NotifyStart(self):
+ """Mark the opcode as running, not lock-waiting.
+
+ This is called from the mcpu code as a notifier function, when the LU is
+ finally about to start the Exec() method. Of course, to have end-user
+ visible results, the opcode must be initially (before calling into
+ Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+
+ """
+ self._queue.acquire()
+ try:
+ assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING)
+
+ # All locks are acquired by now
+ self._job.lock_status = None
+
+ # Cancel here if we were asked to
+ if self._op.status == constants.OP_STATUS_CANCELING:
+ raise CancelJob()
+
+ self._op.status = constants.OP_STATUS_RUNNING
+ finally:
+ self._queue.release()
+
+ def Feedback(self, *args):
+ """Append a log entry.
+
+ """
+ assert len(args) < 3
+
+ if len(args) == 1:
+ log_type = constants.ELOG_MESSAGE
+ log_msg = args[0]
+ else:
+ (log_type, log_msg) = args
+
+ # The time is split to make serialization easier and not lose
+ # precision.
+ timestamp = utils.SplitTime(time.time())
+
+ self._queue.acquire()
+ try:
+ self._job.log_serial += 1
+ self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+
+ self._job.change.notifyAll()
+ finally:
+ self._queue.release()
+
+ def ReportLocks(self, msg):
+ """Write locking information to the job.
+
+ Called whenever the LU processor is waiting for a lock or has acquired one.
+
+ """
+ # Not getting the queue lock because this is a single assignment
+ self._job.lock_status = msg
+