Share the jqueue lock on job-local changes
authorGuido Trotter <ultrotter@google.com>
Tue, 15 Jun 2010 16:39:58 +0000 (17:39 +0100)
committerGuido Trotter <ultrotter@google.com>
Mon, 28 Jun 2010 11:04:56 +0000 (12:04 +0100)
We can share the jqueue lock when we do per-job updates. These only
conflict with updates/checks on the same job from another thread (eg.
CancelJob, ArchiveJob, which keep the lock unshared, since they are less
frequent).

Signed-off-by: Guido Trotter <ultrotter@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

lib/jqueue.py

index 77512a9..870559d 100644 (file)
@@ -429,7 +429,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
 
     """
-    self._queue.acquire()
+    self._queue.acquire(shared=1)
     try:
       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                                  constants.OP_STATUS_CANCELING)
@@ -446,7 +446,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     finally:
       self._queue.release()
 
-  @locking.ssynchronized(_big_jqueue_lock)
+  @locking.ssynchronized(_big_jqueue_lock, shared=1)
   def _AppendFeedback(self, timestamp, log_type, log_msg):
     """Internal feedback append function, with locks
 
@@ -626,7 +626,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                          op_summary)
 
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
               if op.status == constants.OP_STATUS_CANCELED:
                 raise CancelJob()
@@ -646,7 +646,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
             result = proc.ExecOpCode(input_opcode,
                                      _OpExecCallbacks(queue, job, op))
 
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
               op.status = constants.OP_STATUS_SUCCESS
               op.result = result
@@ -661,7 +661,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
             # Will be handled further up
             raise
           except Exception, err:
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
               try:
                 op.status = constants.OP_STATUS_ERROR
@@ -679,7 +679,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
             raise
 
       except CancelJob:
-        queue.acquire()
+        queue.acquire(shared=1)
         try:
           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                                 "Job canceled by request")
@@ -690,7 +690,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       except:
         logging.exception("Unhandled exception")
     finally:
-      queue.acquire()
+      queue.acquire(shared=1)
       try:
         try:
           job.lock_status = None