job queue: fix loss of finalized opcode result
authorIustin Pop <iustin@google.com>
Sun, 19 Jul 2009 01:45:45 +0000 (03:45 +0200)
committerIustin Pop <iustin@google.com>
Sun, 19 Jul 2009 16:36:26 +0000 (18:36 +0200)
Currently, unclean master daemon shutdown overwrites all of a job's
opcode status and result with error/None. This is incorrect, since the
any already finished opcode(s) should have their status and result
preserved, and only not-yet-processed opcodes should be marked as
‘error’. Cancelling jobs between opcodes does the same (but this is not
allowed currently by the code, so it's not as important as unclean
shutdown).

This patch adds a new _QueuedJob function that only overwrites the
status and result of finalized opcodes, which is then used in job queue
init and in the cancel job functions. The patch also adds some comments
and a new set constants in constants.py highlighting the finalized vs.
non-finalized opcode statuses.

Signed-off-by: Iustin Pop <iustin@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>

lib/constants.py
lib/jqueue.py

index 3949f56..05226c0 100644 (file)
@@ -453,13 +453,19 @@ JOB_STATUS_CANCELED = "canceled"
 JOB_STATUS_SUCCESS = "success"
 JOB_STATUS_ERROR = "error"
 
+# OpCode status
+# not yet finalized
 OP_STATUS_QUEUED = "queued"
 OP_STATUS_WAITLOCK = "waiting"
 OP_STATUS_CANCELING = "canceling"
 OP_STATUS_RUNNING = "running"
+# finalized
 OP_STATUS_CANCELED = "canceled"
 OP_STATUS_SUCCESS = "success"
 OP_STATUS_ERROR = "error"
+OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
+                           OP_STATUS_SUCCESS,
+                           OP_STATUS_ERROR])
 
 # Execution log types
 ELOG_MESSAGE = "message"
index 083031c..20fa1c7 100644 (file)
@@ -313,6 +313,26 @@ class _QueuedJob(object):
 
     return entries
 
+  def MarkUnfinishedOps(self, status, result):
+    """Mark unfinished opcodes with a given status and result.
+
+    This is an utility function for marking all running or waiting to
+    be run opcodes with a given status. Opcodes which are already
+    finalised are not changed.
+
+    @param status: a given opcode status
+    @param result: the opcode result
+
+    """
+    not_marked = True
+    for op in self.ops:
+      if op.status in constants.OPS_FINALIZED:
+        assert not_marked, "Finalized opcodes found after non-finalized ones"
+        continue
+      op.status = status
+      op.result = result
+      not_marked = False
+
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
@@ -593,9 +613,8 @@ class JobQueue(object):
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
-              for op in job.ops:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = "Unclean master daemon shutdown"
+              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                    "Unclean master daemon shutdown")
             finally:
               self.UpdateJobUnlocked(job)
 
@@ -1157,8 +1176,7 @@ class JobQueue(object):
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
-        for op in job.ops:
-          op.status = constants.OP_STATUS_CANCELING
+        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
@@ -1169,9 +1187,8 @@ class JobQueue(object):
 
     """
     try:
-      for op in job.ops:
-        op.status = constants.OP_STATUS_CANCELED
-        op.result = "Job canceled by request"
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
     finally:
       self.UpdateJobUnlocked(job)