jqueue: Work around race condition between job processing and archival
authorMichael Hanselmann <hansmi@google.com>
Tue, 17 Aug 2010 13:33:52 +0000 (15:33 +0200)
committerMichael Hanselmann <hansmi@google.com>
Wed, 18 Aug 2010 11:21:03 +0000 (13:21 +0200)
This is a simplified version of a patch I sent earlier to make sure the job
file is only written once with a finalized status.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

lib/jqueue.py

index 467cada..ca75552 100644 (file)
@@ -754,6 +754,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
               op.status = constants.OP_STATUS_SUCCESS
               op.result = result
               op.end_timestamp = TimeStampNow()
+              if idx == count - 1:
+                job.lock_status = None
+                job.end_timestamp = TimeStampNow()
               queue.UpdateJobUnlocked(job)
             finally:
               queue.release()
@@ -778,6 +781,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
                 logging.info("Op %s/%s: Error in opcode %s: %s",
                              idx + 1, count, op_summary, err)
               finally:
+                job.lock_status = None
+                job.end_timestamp = TimeStampNow()
                 queue.UpdateJobUnlocked(job)
             finally:
               queue.release()
@@ -788,6 +793,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
         try:
           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                                 "Job canceled by request")
+          job.lock_status = None
+          job.end_timestamp = TimeStampNow()
+          queue.UpdateJobUnlocked(job)
         finally:
           queue.release()
       except errors.GenericError, err:
@@ -795,19 +803,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
       except:
         logging.exception("Unhandled exception")
     finally:
-      queue.acquire(shared=1)
-      try:
-        try:
-          job.lock_status = None
-          job.end_timestamp = TimeStampNow()
-          queue.UpdateJobUnlocked(job)
-        finally:
-          job_id = job.id
-          status = job.CalcStatus()
-      finally:
-        queue.release()
-
-      logging.info("Finished job %s, status = %s", job_id, status)
+      status = job.CalcStatus()
+      logging.info("Finished job %s, status = %s", job.id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):