hooks: Provide variables with post-opcode values
[ganeti-local] / lib / jqueue.py
index f8f9fd8..56f7a66 100644 (file)
@@ -29,7 +29,6 @@ used by all other classes in this module.
 
 """
 
-import os
 import logging
 import errno
 import re
@@ -376,6 +375,8 @@ class _QueuedJob(object):
         row.append(self.id)
       elif fname == "status":
         row.append(self.CalcStatus())
+      elif fname == "priority":
+        row.append(self.CalcPriority())
       elif fname == "ops":
         row.append([op.input.__getstate__() for op in self.ops])
       elif fname == "opresult":
@@ -390,6 +391,8 @@ class _QueuedJob(object):
         row.append([op.exec_timestamp for op in self.ops])
       elif fname == "opend":
         row.append([op.end_timestamp for op in self.ops])
+      elif fname == "oppriority":
+        row.append([op.priority for op in self.ops])
       elif fname == "received_ts":
         row.append(self.received_timestamp)
       elif fname == "start_ts":
@@ -891,13 +894,28 @@ class _JobProcessor(object):
 
     """
     assert op in job.ops
+    assert op.status in (constants.OP_STATUS_QUEUED,
+                         constants.OP_STATUS_WAITLOCK)
+
+    update = False
 
-    op.status = constants.OP_STATUS_WAITLOCK
     op.result = None
-    op.start_timestamp = TimeStampNow()
+
+    if op.status == constants.OP_STATUS_QUEUED:
+      op.status = constants.OP_STATUS_WAITLOCK
+      update = True
+
+    if op.start_timestamp is None:
+      op.start_timestamp = TimeStampNow()
+      update = True
 
     if job.start_timestamp is None:
       job.start_timestamp = op.start_timestamp
+      update = True
+
+    assert op.status == constants.OP_STATUS_WAITLOCK
+
+    return update
 
   def _ExecOpCodeUnlocked(self, opctx):
     """Processes one opcode and returns the result.
@@ -917,8 +935,16 @@ class _JobProcessor(object):
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
-      assert op.status == constants.OP_STATUS_WAITLOCK
-      return (constants.OP_STATUS_QUEUED, None)
+
+      assert op.status in (constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING)
+
+      # Was job cancelled while we were waiting for the lock?
+      if op.status == constants.OP_STATUS_CANCELING:
+        return (constants.OP_STATUS_CANCELING, None)
+
+      # Stay in waitlock while trying to re-acquire lock
+      return (constants.OP_STATUS_WAITLOCK, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
@@ -953,6 +979,7 @@ class _JobProcessor(object):
       # Is a previous opcode still pending?
       if job.cur_opctx:
         opctx = job.cur_opctx
+        job.cur_opctx = None
       else:
         if __debug__ and _nextop_fn:
           _nextop_fn()
@@ -962,25 +989,31 @@ class _JobProcessor(object):
 
       # Consistency check
       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+                                     constants.OP_STATUS_CANCELING,
                                      constants.OP_STATUS_CANCELED)
-                        for i in job.ops[opctx.index:])
+                        for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
                            constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING,
                            constants.OP_STATUS_CANCELED)
 
       assert (op.priority <= constants.OP_PRIO_LOWEST and
               op.priority >= constants.OP_PRIO_HIGHEST)
 
-      if op.status != constants.OP_STATUS_CANCELED:
+      if op.status not in (constants.OP_STATUS_CANCELING,
+                           constants.OP_STATUS_CANCELED):
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITLOCK)
+
         # Prepare to start opcode
-        self._MarkWaitlock(job, op)
+        if self._MarkWaitlock(job, op):
+          # Write to disk
+          queue.UpdateJobUnlocked(job)
 
         assert op.status == constants.OP_STATUS_WAITLOCK
         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
-
-        # Write to disk
-        queue.UpdateJobUnlocked(job)
+        assert job.start_timestamp and op.start_timestamp
 
         logging.info("%s: opcode %s waiting for locks",
                      opctx.log_prefix, opctx.summary)
@@ -994,7 +1027,7 @@ class _JobProcessor(object):
         op.status = op_status
         op.result = op_result
 
-        if op.status == constants.OP_STATUS_QUEUED:
+        if op.status == constants.OP_STATUS_WAITLOCK:
           # Couldn't get locks in time
           assert not op.end_timestamp
         else:
@@ -1007,10 +1040,12 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_QUEUED:
+      if op.status == constants.OP_STATUS_WAITLOCK:
         finalize = False
 
-        opctx.CheckPriorityIncrease()
+        if opctx.CheckPriorityIncrease():
+          # Priority was changed, need to update on-disk file
+          queue.UpdateJobUnlocked(job)
 
         # Keep around for another round
         job.cur_opctx = opctx
@@ -1019,9 +1054,7 @@ class _JobProcessor(object):
                 op.priority >= constants.OP_PRIO_HIGHEST)
 
         # In no case must the status be finalized here
-        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
-
-        queue.UpdateJobUnlocked(job)
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
 
       else:
         # Ensure all opcodes so far have been successful
@@ -1196,7 +1229,7 @@ class JobQueue(object):
 
     self._queue_size = 0
     self._UpdateQueueSizeUnlocked()
-    self._drained = self._IsQueueMarkedDrain()
+    self._drained = jstore.CheckDrainFlag()
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
@@ -1238,15 +1271,22 @@ class JobQueue(object):
 
       status = job.CalcStatus()
 
-      if status in (constants.JOB_STATUS_QUEUED, ):
+      if status == constants.JOB_STATUS_QUEUED:
         restartjobs.append(job)
 
       elif status in (constants.JOB_STATUS_RUNNING,
                       constants.JOB_STATUS_WAITLOCK,
                       constants.JOB_STATUS_CANCELING):
         logging.warning("Unfinished job %s found: %s", job.id, job)
-        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                              "Unclean master daemon shutdown")
+
+        if status == constants.JOB_STATUS_WAITLOCK:
+          # Restart job
+          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
+          restartjobs.append(job)
+        else:
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                "Unclean master daemon shutdown")
+
         self.UpdateJobUnlocked(job)
 
     if restartjobs:
@@ -1588,19 +1628,6 @@ class JobQueue(object):
       logging.exception("Can't load/parse job %s", job_id)
       return None
 
-  @staticmethod
-  def _IsQueueMarkedDrain():
-    """Check if the queue is marked from drain.
-
-    This currently uses the queue drain file, which makes it a
-    per-node flag. In the future this can be moved to the config file.
-
-    @rtype: boolean
-    @return: True of the job queue is marked for draining
-
-    """
-    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
-
   def _UpdateQueueSizeUnlocked(self):
     """Update the queue size.
 
@@ -1616,13 +1643,7 @@ class JobQueue(object):
     @param drain_flag: Whether to set or unset the drain flag
 
     """
-    getents = runtime.GetEnts()
-
-    if drain_flag:
-      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
-                      uid=getents.masterd_uid, gid=getents.masterd_gid)
-    else:
-      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+    jstore.SetDrainFlag(drain_flag)
 
     self._drained = drain_flag