jqueue: Fix deadlock between job queue and dependency manager
authorMichael Hanselmann <hansmi@google.com>
Mon, 19 Dec 2011 15:26:55 +0000 (16:26 +0100)
committerMichael Hanselmann <hansmi@google.com>
Wed, 21 Dec 2011 14:35:50 +0000 (15:35 +0100)
When an opcode is about to be processed its dependencies are
evaluated using “_JobDependencyManager.CheckAndRegister”. Due
to its nature that function requires a lock on the manager's
internal structures. All of this happens while the job queue
lock is held in shared mode (required for the job processor).

When a job has been processed any pending dependencies are re-added
to the job workerpool. Before this patch that would require
the manager's lock and then, for adding the jobs, the job queue
lock. Since this is in reverse order it will lead to deadlocks.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

lib/jqueue.py
test/ganeti.jqueue_unittest.py

index d5ea3cb..cc06b70 100644 (file)
@@ -1439,28 +1439,39 @@ class _JobDependencyManager:
               " not one of '%s' as required" %
               (dep_job_id, status, utils.CommaJoin(dep_status)))
 
-  @locking.ssynchronized(_LOCK)
+  def _RemoveEmptyWaitersUnlocked(self):
+    """Remove all jobs without actual waiters.
+
+    """
+    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
+                   if not waiters]:
+      del self._waiters[job_id]
+
   def NotifyWaiters(self, job_id):
     """Notifies all jobs waiting for a certain job ID.
 
+    @important: Do not call until L{CheckAndRegister} returned a status other
+      than L{self.WAIT} for C{job_id}, or behaviour is undefined
     @type job_id: string
     @param job_id: Job ID
 
     """
     assert ht.TString(job_id)
 
-    jobs = self._waiters.pop(job_id, None)
+    self._lock.acquire()
+    try:
+      self._RemoveEmptyWaitersUnlocked()
+
+      jobs = self._waiters.pop(job_id, None)
+    finally:
+      self._lock.release()
+
     if jobs:
       # Re-add jobs to workerpool
       logging.debug("Re-adding %s jobs which were waiting for job %s",
                     len(jobs), job_id)
       self._enqueue_fn(jobs)
 
-    # Remove all jobs without actual waiters
-    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
-                   if not waiters]:
-      del self._waiters[job_id]
-
 
 def _RequireOpenQueue(fn):
   """Decorator for "public" functions.
index 45c2984..f2ffc8a 100755 (executable)
@@ -1874,6 +1874,9 @@ class TestJobDependencyManager(unittest.TestCase):
     return result
 
   def _Enqueue(self, jobs):
+    self.assertFalse(self.jdm._lock.is_owned(),
+                     msg=("Must not own manager lock while re-adding jobs"
+                          " (potential deadlock)"))
     self._queue.append(jobs)
 
   def testNotFinalizedThenCancel(self):