cli.JobExecutor: Handle empty name, allow adding job IDs
[ganeti-local] / lib / jqueue.py
index 2871c6a..d137443 100644 (file)
@@ -540,6 +540,15 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # Cancel here if we were asked to
     self._CheckCancel()
 
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
+
+    See L{JobQueue.SubmitManyJobs}.
+
+    """
+    # Locking is done in job queue
+    return self._queue.SubmitManyJobs(jobs)
+
 
 class _JobChangesChecker(object):
   def __init__(self, fields, prev_job_info, prev_log_serial):
@@ -1128,21 +1137,56 @@ class _JobQueueWorker(workerpool.BaseWorker):
     queue = job.queue
     assert queue == self.pool.queue
 
-    self.SetTaskName("Job%s" % job.id)
+    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
+    setname_fn(None)
 
     proc = mcpu.Processor(queue.context, job.id)
 
-    if not _JobProcessor(queue, proc.ExecOpCode, job)():
+    # Create wrapper for setting thread name
+    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
+                                    proc.ExecOpCode)
+
+    if not _JobProcessor(queue, wrap_execop_fn, job)():
       # Schedule again
       raise workerpool.DeferTask(priority=job.CalcPriority())
 
+  @staticmethod
+  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
+    """Updates the worker thread name to include a short summary of the opcode.
+
+    @param setname_fn: Callable setting worker thread name
+    @param execop_fn: Callable for executing opcode (usually
+                      L{mcpu.Processor.ExecOpCode})
+
+    """
+    setname_fn(op)
+    try:
+      return execop_fn(op, *args, **kwargs)
+    finally:
+      setname_fn(None)
+
+  @staticmethod
+  def _GetWorkerName(job, op):
+    """Sets the worker thread name.
+
+    @type job: L{_QueuedJob}
+    @type op: L{opcodes.OpCode}
+
+    """
+    parts = ["Job%s" % job.id]
+
+    if op:
+      parts.append(op.TinySummary())
+
+    return "/".join(parts)
+
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
   """Simple class implementing a job-processing workerpool.
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+    super(_JobQueueWorkerPool, self).__init__("Jq",
                                               JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue