# Cancel here if we were asked to
self._CheckCancel()
+ def SubmitManyJobs(self, jobs):
+ """Submits jobs for processing.
+
+ See L{JobQueue.SubmitManyJobs}.
+
+ """
+ # Locking is done in job queue
+ return self._queue.SubmitManyJobs(jobs)
+
class _JobChangesChecker(object):
def __init__(self, fields, prev_job_info, prev_log_serial):
queue = job.queue
assert queue == self.pool.queue
- self.SetTaskName("Job%s" % job.id)
+ setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
+ setname_fn(None)
proc = mcpu.Processor(queue.context, job.id)
- if not _JobProcessor(queue, proc.ExecOpCode, job)():
+ # Create wrapper for setting thread name
+ wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
+ proc.ExecOpCode)
+
+ if not _JobProcessor(queue, wrap_execop_fn, job)():
# Schedule again
raise workerpool.DeferTask(priority=job.CalcPriority())
+ @staticmethod
+ def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
+ """Updates the worker thread name to include a short summary of the opcode.
+
+ @param setname_fn: Callable setting worker thread name
+ @param execop_fn: Callable for executing opcode (usually
+ L{mcpu.Processor.ExecOpCode})
+
+ """
+ setname_fn(op)
+ try:
+ return execop_fn(op, *args, **kwargs)
+ finally:
+ setname_fn(None)
+
+ @staticmethod
+ def _GetWorkerName(job, op):
+ """Sets the worker thread name.
+
+ @type job: L{_QueuedJob}
+ @type op: L{opcodes.OpCode}
+
+ """
+ parts = ["Job%s" % job.id]
+
+ if op:
+ parts.append(op.TinySummary())
+
+ return "/".join(parts)
+
class _JobQueueWorkerPool(workerpool.WorkerPool):
"""Simple class implementing a job-processing workerpool.
"""
def __init__(self, queue):
- super(_JobQueueWorkerPool, self).__init__("JobQueue",
+ super(_JobQueueWorkerPool, self).__init__("Jq",
JOBQUEUE_THREADS,
_JobQueueWorker)
self.queue = queue