cli.JobExecutor: Handle empty name, allow adding job IDs
[ganeti-local] / lib / jqueue.py
index e2b4cf4..d137443 100644 (file)
@@ -29,7 +29,6 @@ used by all other classes in this module.
 
 """
 
-import os
 import logging
 import errno
 import re
@@ -376,6 +375,8 @@ class _QueuedJob(object):
         row.append(self.id)
       elif fname == "status":
         row.append(self.CalcStatus())
+      elif fname == "priority":
+        row.append(self.CalcPriority())
       elif fname == "ops":
         row.append([op.input.__getstate__() for op in self.ops])
       elif fname == "opresult":
@@ -390,6 +391,8 @@ class _QueuedJob(object):
         row.append([op.exec_timestamp for op in self.ops])
       elif fname == "opend":
         row.append([op.end_timestamp for op in self.ops])
+      elif fname == "oppriority":
+        row.append([op.priority for op in self.ops])
       elif fname == "received_ts":
         row.append(self.received_timestamp)
       elif fname == "start_ts":
@@ -432,22 +435,19 @@ class _QueuedJob(object):
     """
     status = self.CalcStatus()
 
-    if status not in (constants.JOB_STATUS_QUEUED,
-                      constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer waiting in the queue", self.id)
-      return (False, "Job %s is no longer waiting in the queue" % self.id)
-
     if status == constants.JOB_STATUS_QUEUED:
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                              "Job canceled by request")
-      msg = "Job %s canceled" % self.id
+      return (True, "Job %s canceled" % self.id)
 
     elif status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
-      msg = "Job %s will be canceled" % self.id
+      return (True, "Job %s will be canceled" % self.id)
 
-    return (True, msg)
+    else:
+      logging.debug("Job %s is no longer waiting in the queue", self.id)
+      return (False, "Job %s is no longer waiting in the queue" % self.id)
 
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
@@ -540,6 +540,15 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # Cancel here if we were asked to
     self._CheckCancel()
 
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
+
+    See L{JobQueue.SubmitManyJobs}.
+
+    """
+    # Locking is done in job queue
+    return self._queue.SubmitManyJobs(jobs)
+
 
 class _JobChangesChecker(object):
   def __init__(self, fields, prev_job_info, prev_log_serial):
@@ -889,18 +898,33 @@ class _JobProcessor(object):
 
     @type job: L{_QueuedJob}
     @param job: Job object
-    @type job: L{_QueuedOpCode}
-    @param job: Opcode object
+    @type op: L{_QueuedOpCode}
+    @param op: Opcode object
 
     """
     assert op in job.ops
+    assert op.status in (constants.OP_STATUS_QUEUED,
+                         constants.OP_STATUS_WAITLOCK)
+
+    update = False
 
-    op.status = constants.OP_STATUS_WAITLOCK
     op.result = None
-    op.start_timestamp = TimeStampNow()
+
+    if op.status == constants.OP_STATUS_QUEUED:
+      op.status = constants.OP_STATUS_WAITLOCK
+      update = True
+
+    if op.start_timestamp is None:
+      op.start_timestamp = TimeStampNow()
+      update = True
 
     if job.start_timestamp is None:
       job.start_timestamp = op.start_timestamp
+      update = True
+
+    assert op.status == constants.OP_STATUS_WAITLOCK
+
+    return update
 
   def _ExecOpCodeUnlocked(self, opctx):
     """Processes one opcode and returns the result.
@@ -920,8 +944,16 @@ class _JobProcessor(object):
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
-      assert op.status == constants.OP_STATUS_WAITLOCK
-      return (constants.OP_STATUS_QUEUED, None)
+
+      assert op.status in (constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING)
+
+      # Was job cancelled while we were waiting for the lock?
+      if op.status == constants.OP_STATUS_CANCELING:
+        return (constants.OP_STATUS_CANCELING, None)
+
+      # Stay in waitlock while trying to re-acquire lock
+      return (constants.OP_STATUS_WAITLOCK, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
@@ -956,6 +988,7 @@ class _JobProcessor(object):
       # Is a previous opcode still pending?
       if job.cur_opctx:
         opctx = job.cur_opctx
+        job.cur_opctx = None
       else:
         if __debug__ and _nextop_fn:
           _nextop_fn()
@@ -965,25 +998,31 @@ class _JobProcessor(object):
 
       # Consistency check
       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+                                     constants.OP_STATUS_CANCELING,
                                      constants.OP_STATUS_CANCELED)
-                        for i in job.ops[opctx.index:])
+                        for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
                            constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING,
                            constants.OP_STATUS_CANCELED)
 
       assert (op.priority <= constants.OP_PRIO_LOWEST and
               op.priority >= constants.OP_PRIO_HIGHEST)
 
-      if op.status != constants.OP_STATUS_CANCELED:
+      if op.status not in (constants.OP_STATUS_CANCELING,
+                           constants.OP_STATUS_CANCELED):
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITLOCK)
+
         # Prepare to start opcode
-        self._MarkWaitlock(job, op)
+        if self._MarkWaitlock(job, op):
+          # Write to disk
+          queue.UpdateJobUnlocked(job)
 
         assert op.status == constants.OP_STATUS_WAITLOCK
         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
-
-        # Write to disk
-        queue.UpdateJobUnlocked(job)
+        assert job.start_timestamp and op.start_timestamp
 
         logging.info("%s: opcode %s waiting for locks",
                      opctx.log_prefix, opctx.summary)
@@ -997,7 +1036,7 @@ class _JobProcessor(object):
         op.status = op_status
         op.result = op_result
 
-        if op.status == constants.OP_STATUS_QUEUED:
+        if op.status == constants.OP_STATUS_WAITLOCK:
           # Couldn't get locks in time
           assert not op.end_timestamp
         else:
@@ -1010,10 +1049,12 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_QUEUED:
+      if op.status == constants.OP_STATUS_WAITLOCK:
         finalize = False
 
-        opctx.CheckPriorityIncrease()
+        if opctx.CheckPriorityIncrease():
+          # Priority was changed, need to update on-disk file
+          queue.UpdateJobUnlocked(job)
 
         # Keep around for another round
         job.cur_opctx = opctx
@@ -1022,9 +1063,7 @@ class _JobProcessor(object):
                 op.priority >= constants.OP_PRIO_HIGHEST)
 
         # In no case must the status be finalized here
-        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
-
-        queue.UpdateJobUnlocked(job)
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
 
       else:
         # Ensure all opcodes so far have been successful
@@ -1098,21 +1137,56 @@ class _JobQueueWorker(workerpool.BaseWorker):
     queue = job.queue
     assert queue == self.pool.queue
 
-    self.SetTaskName("Job%s" % job.id)
+    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
+    setname_fn(None)
 
     proc = mcpu.Processor(queue.context, job.id)
 
-    if not _JobProcessor(queue, proc.ExecOpCode, job)():
+    # Create wrapper for setting thread name
+    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
+                                    proc.ExecOpCode)
+
+    if not _JobProcessor(queue, wrap_execop_fn, job)():
       # Schedule again
       raise workerpool.DeferTask(priority=job.CalcPriority())
 
+  @staticmethod
+  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
+    """Updates the worker thread name to include a short summary of the opcode.
+
+    @param setname_fn: Callable setting worker thread name
+    @param execop_fn: Callable for executing opcode (usually
+                      L{mcpu.Processor.ExecOpCode})
+
+    """
+    setname_fn(op)
+    try:
+      return execop_fn(op, *args, **kwargs)
+    finally:
+      setname_fn(None)
+
+  @staticmethod
+  def _GetWorkerName(job, op):
+    """Sets the worker thread name.
+
+    @type job: L{_QueuedJob}
+    @type op: L{opcodes.OpCode}
+
+    """
+    parts = ["Job%s" % job.id]
+
+    if op:
+      parts.append(op.TinySummary())
+
+    return "/".join(parts)
+
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
   """Simple class implementing a job-processing workerpool.
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+    super(_JobQueueWorkerPool, self).__init__("Jq",
                                               JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
@@ -1199,7 +1273,7 @@ class JobQueue(object):
 
     self._queue_size = 0
     self._UpdateQueueSizeUnlocked()
-    self._drained = self._IsQueueMarkedDrain()
+    self._drained = jstore.CheckDrainFlag()
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
@@ -1241,15 +1315,22 @@ class JobQueue(object):
 
       status = job.CalcStatus()
 
-      if status in (constants.JOB_STATUS_QUEUED, ):
+      if status == constants.JOB_STATUS_QUEUED:
         restartjobs.append(job)
 
       elif status in (constants.JOB_STATUS_RUNNING,
                       constants.JOB_STATUS_WAITLOCK,
                       constants.JOB_STATUS_CANCELING):
         logging.warning("Unfinished job %s found: %s", job.id, job)
-        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                              "Unclean master daemon shutdown")
+
+        if status == constants.JOB_STATUS_WAITLOCK:
+          # Restart job
+          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
+          restartjobs.append(job)
+        else:
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                "Unclean master daemon shutdown")
+
         self.UpdateJobUnlocked(job)
 
     if restartjobs:
@@ -1591,19 +1672,6 @@ class JobQueue(object):
       logging.exception("Can't load/parse job %s", job_id)
       return None
 
-  @staticmethod
-  def _IsQueueMarkedDrain():
-    """Check if the queue is marked from drain.
-
-    This currently uses the queue drain file, which makes it a
-    per-node flag. In the future this can be moved to the config file.
-
-    @rtype: boolean
-    @return: True of the job queue is marked for draining
-
-    """
-    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
-
   def _UpdateQueueSizeUnlocked(self):
     """Update the queue size.
 
@@ -1619,13 +1687,7 @@ class JobQueue(object):
     @param drain_flag: Whether to set or unset the drain flag
 
     """
-    getents = runtime.GetEnts()
-
-    if drain_flag:
-      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
-                      uid=getents.masterd_uid, gid=getents.masterd_gid)
-    else:
-      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+    jstore.SetDrainFlag(drain_flag)
 
     self._drained = drain_flag
 
@@ -1705,7 +1767,8 @@ class JobQueue(object):
         status = True
         data = job_id
       except errors.GenericError, err:
-        data = str(err)
+        data = ("%s; opcodes %s" %
+                (err, utils.CommaJoin(op.Summary() for op in ops)))
         status = False
       results.append((status, data))