jqueue: Fix deadlock between job queue and dependency manager
[ganeti-local] / test / ganeti.jqueue_unittest.py
index a0dd025..f2ffc8a 100755 (executable)
@@ -28,6 +28,7 @@ import tempfile
 import shutil
 import errno
 import itertools
+import random
 
 from ganeti import constants
 from ganeti import utils
@@ -36,6 +37,7 @@ from ganeti import jqueue
 from ganeti import opcodes
 from ganeti import compat
 from ganeti import mcpu
+from ganeti import query
 
 import testutils
 
@@ -43,6 +45,7 @@ import testutils
 class _FakeJob:
   def __init__(self, job_id, status):
     self.id = job_id
+    self.writable = False
     self._status = status
     self._log = []
 
@@ -200,7 +203,7 @@ class TestWaitForJobChangesHelper(unittest.TestCase):
     shutil.rmtree(self.tmpdir)
 
   def _LoadWaitingJob(self):
-    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
+    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
 
   def _LoadLostJob(self):
     return None
@@ -210,13 +213,13 @@ class TestWaitForJobChangesHelper(unittest.TestCase):
 
     # No change
     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
-                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
+                          [constants.JOB_STATUS_WAITING], None, 0.1),
                      constants.JOB_NOTCHANGED)
 
     # No previous information
     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
                           ["status"], None, None, 1.0),
-                     ([constants.JOB_STATUS_WAITLOCK], []))
+                     ([constants.JOB_STATUS_WAITING], []))
 
   def testLostJob(self):
     wfjc = jqueue._WaitForJobChangesHelper()
@@ -279,7 +282,7 @@ class TestQueuedOpCode(unittest.TestCase):
 class TestQueuedJob(unittest.TestCase):
   def test(self):
     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
-                      None, 1, [])
+                      None, 1, [], False)
 
   def testDefaults(self):
     job_id = 4260
@@ -289,6 +292,7 @@ class TestQueuedJob(unittest.TestCase):
       ]
 
     def _Check(job):
+      self.assertTrue(job.writable)
       self.assertEqual(job.id, job_id)
       self.assertEqual(job.log_serial, 0)
       self.assert_(job.received_timestamp)
@@ -305,12 +309,19 @@ class TestQueuedJob(unittest.TestCase):
       self.assertEqual(job.GetInfo(["summary"]),
                        [[op.input.Summary() for op in job.ops]])
 
-    job1 = jqueue._QueuedJob(None, job_id, ops)
+    job1 = jqueue._QueuedJob(None, job_id, ops, True)
     _Check(job1)
-    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
+    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
     _Check(job2)
     self.assertEqual(job1.Serialize(), job2.Serialize())
 
+  def testWritable(self):
+    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
+    self.assertFalse(job.writable)
+
+    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
+    self.assertTrue(job.writable)
+
   def testPriority(self):
     job_id = 4283
     ops = [
@@ -323,7 +334,7 @@ class TestQueuedJob(unittest.TestCase):
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       self.assert_(repr(job).startswith("<"))
 
-    job = jqueue._QueuedJob(None, job_id, ops)
+    job = jqueue._QueuedJob(None, job_id, ops, True)
     _Check(job)
     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
                             for op in job.ops))
@@ -355,12 +366,12 @@ class TestQueuedJob(unittest.TestCase):
                               for op in ops))
 
     def _Waitlock1(ops):
-      ops[0].status = constants.OP_STATUS_WAITLOCK
+      ops[0].status = constants.OP_STATUS_WAITING
 
     def _Waitlock2(ops):
       ops[0].status = constants.OP_STATUS_SUCCESS
       ops[1].status = constants.OP_STATUS_SUCCESS
-      ops[2].status = constants.OP_STATUS_WAITLOCK
+      ops[2].status = constants.OP_STATUS_WAITING
 
     def _Running(ops):
       ops[0].status = constants.OP_STATUS_SUCCESS
@@ -399,7 +410,7 @@ class TestQueuedJob(unittest.TestCase):
 
     tests = {
       constants.JOB_STATUS_QUEUED: [_Queued],
-      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
+      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
       constants.JOB_STATUS_RUNNING: [_Running],
       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
       constants.JOB_STATUS_CANCELED: [_Canceled],
@@ -409,7 +420,8 @@ class TestQueuedJob(unittest.TestCase):
 
     def _NewJob():
       job = jqueue._QueuedJob(None, 1,
-                              [opcodes.OpTestDelay() for _ in range(10)])
+                              [opcodes.OpTestDelay() for _ in range(10)],
+                              True)
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
                               for op in job.ops))
@@ -424,14 +436,71 @@ class TestQueuedJob(unittest.TestCase):
         self.assertEqual(job.CalcStatus(), status)
 
 
-class _FakeQueueForProc:
+class _FakeDependencyManager:
   def __init__(self):
+    self._checks = []
+    self._notifications = []
+    self._waiting = set()
+
+  def AddCheckResult(self, job, dep_job_id, dep_status, result):
+    self._checks.append((job, dep_job_id, dep_status, result))
+
+  def CountPendingResults(self):
+    return len(self._checks)
+
+  def CountWaitingJobs(self):
+    return len(self._waiting)
+
+  def GetNextNotification(self):
+    return self._notifications.pop(0)
+
+  def JobWaiting(self, job):
+    return job in self._waiting
+
+  def CheckAndRegister(self, job, dep_job_id, dep_status):
+    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
+
+    assert exp_job == job
+    assert exp_dep_job_id == dep_job_id
+    assert exp_dep_status == dep_status
+
+    (result_status, _) = result
+
+    if result_status == jqueue._JobDependencyManager.WAIT:
+      self._waiting.add(job)
+    elif result_status == jqueue._JobDependencyManager.CONTINUE:
+      self._waiting.remove(job)
+
+    return result
+
+  def NotifyWaiters(self, job_id):
+    self._notifications.append(job_id)
+
+
+class _DisabledFakeDependencyManager:
+  def JobWaiting(self, _):
+    return False
+
+  def CheckAndRegister(self, *args):
+    assert False, "Should not be called"
+
+  def NotifyWaiters(self, _):
+    pass
+
+
+class _FakeQueueForProc:
+  def __init__(self, depmgr=None):
     self._acquired = False
     self._updates = []
     self._submitted = []
 
     self._submit_count = itertools.count(1000)
 
+    if depmgr:
+      self.depmgr = depmgr
+    else:
+      self.depmgr = _DisabledFakeDependencyManager()
+
   def IsAcquired(self):
     return self._acquired
 
@@ -493,7 +562,7 @@ class _FakeExecOpCodeForProc:
 
 class _JobProcessorTestUtils:
   def _CreateJob(self, queue, job_id, ops):
-    job = jqueue._QueuedJob(queue, job_id, ops)
+    job = jqueue._QueuedJob(queue, job_id, ops, True)
     self.assertFalse(job.start_timestamp)
     self.assertFalse(job.end_timestamp)
     self.assertEqual(len(ops), len(job.ops))
@@ -535,7 +604,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
         self.assertEqual(queue.GetNextUpdate(), (job, True))
         self.assertRaises(IndexError, queue.GetNextUpdate)
         self.assertFalse(queue.IsAcquired())
-        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
         self.assertFalse(job.cur_opctx)
 
       def _AfterStart(op, cbs):
@@ -559,9 +628,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
         self.assertRaises(IndexError, queue.GetNextUpdate)
         if idx == len(ops) - 1:
           # Last opcode
-          self.assert_(result)
+          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
         else:
-          self.assertFalse(result)
+          self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
           self.assert_(job.start_timestamp)
@@ -581,7 +650,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self._GenericCheckJob(job)
 
       # Calling the processor on a finished job should be a no-op
-      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                       jqueue._JobProcessor.FINISHED)
       self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testOpcodeError(self):
@@ -620,10 +690,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
         if idx in (failfrom, len(ops) - 1):
           # Last opcode
-          self.assert_(result)
+          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
           break
 
-        self.assertFalse(result)
+        self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
@@ -659,7 +729,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self._GenericCheckJob(job)
 
       # Calling the processor on a finished job should be a no-op
-      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                       jqueue._JobProcessor.FINISHED)
       self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testCancelWhileInQueue(self):
@@ -690,7 +761,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     # Simulate processor called in workerpool
     opexec = _FakeExecOpCodeForProc(queue, None, None)
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -719,11 +791,11 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
-    job.ops[0].status = constants.OP_STATUS_WAITLOCK
+    job.ops[0].status = constants.OP_STATUS_WAITING
 
     assert len(job.ops) == 5
 
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
     # Mark as cancelling
     (success, _) = job.Cancel()
@@ -735,7 +807,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
                             for op in job.ops))
 
     opexec = _FakeExecOpCodeForProc(queue, None, None)
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -764,7 +837,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
       # Mark as cancelled
       (success, _) = job.Cancel()
@@ -783,7 +856,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
 
     self.assertRaises(IndexError, queue.GetNextUpdate)
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertEqual(queue.GetNextUpdate(), (job, True))
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
@@ -812,7 +886,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     def _BeforeStart(timeout, priority):
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
       # Mark as cancelled
       (success, _) = job.Cancel()
@@ -829,7 +903,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
 
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -858,7 +933,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     # Run one opcode
-    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.DEFER)
 
     # Job goes back to queued
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
@@ -873,7 +949,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assert_(success)
 
     # Try processing another opcode (this will actually cancel the job)
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -903,7 +980,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
       for _ in range(successcount):
-        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                         jqueue._JobProcessor.DEFER)
 
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       self.assertEqual(job.GetInfo(["opstatus"]),
@@ -915,7 +993,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assert_(job.ops_iter)
 
       # Serialize and restore (simulates program restart)
-      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
       self.assertFalse(newjob.ops_iter)
       self._TestPartial(newjob, successcount)
 
@@ -935,10 +1013,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
       if remaining == 0:
         # Last opcode
-        self.assert_(result)
+        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
         break
 
-      self.assertFalse(result)
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
@@ -955,12 +1033,15 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self._GenericCheckJob(job)
 
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
     # ... also after being restored
-    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
-    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
+    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
+    # Calling the processor on a finished job should be a no-op
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testProcessorOnRunningJob(self):
@@ -1010,7 +1091,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
     def _AfterStart(op, cbs):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
@@ -1041,10 +1122,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
       if remaining == 0:
         # Last opcode
-        self.assert_(result)
+        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
         break
 
-      self.assertFalse(result)
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
@@ -1059,7 +1140,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self._CheckLogMessages(job, logmsgcount)
 
     # Serialize and restore (simulates program restart)
-    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
     self._CheckLogMessages(newjob, logmsgcount)
 
     # Check each message
@@ -1125,7 +1206,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
       self.assertFalse(job.cur_opctx)
 
     def _AfterStart(op, cbs):
@@ -1149,9 +1230,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertRaises(IndexError, queue.GetNextUpdate)
       if idx == len(ops) - 1:
         # Last opcode
-        self.assert_(result)
+        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
       else:
-        self.assertFalse(result)
+        self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
         self.assert_(job.start_timestamp)
@@ -1176,7 +1257,371 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self._GenericCheckJob(job)
 
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+  def testJobDependency(self):
+    depmgr = _FakeDependencyManager()
+    queue = _FakeQueueForProc(depmgr=depmgr)
+
+    self.assertEqual(queue.depmgr, depmgr)
+
+    prev_job_id = 22113
+    prev_job_id2 = 28102
+    job_id = 29929
+    ops = [
+      opcodes.OpTestDummy(result="Res0", fail=False,
+                          depends=[
+                            [prev_job_id2, None],
+                            [prev_job_id, None],
+                            ]),
+      opcodes.OpTestDummy(result="Res1", fail=False),
+      ]
+
+    # Create job
+    job = self._CreateJob(queue, job_id, ops)
+
+    def _BeforeStart(timeout, priority):
+      if attempt == 0 or attempt > 5:
+        # Job should only be updated when it wasn't waiting for another job
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+      self.assertFalse(job.cur_opctx)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+      self.assertFalse(job.cur_opctx)
+
+      # Job is running, cancelling shouldn't be possible
+      (success, _) = job.Cancel()
+      self.assertFalse(success)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    counter = itertools.count()
+    while True:
+      attempt = counter.next()
+
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+      if attempt < 2:
+        depmgr.AddCheckResult(job, prev_job_id2, None,
+                              (jqueue._JobDependencyManager.WAIT, "wait2"))
+      elif attempt == 2:
+        depmgr.AddCheckResult(job, prev_job_id2, None,
+                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
+        # The processor will ask for the next dependency immediately
+        depmgr.AddCheckResult(job, prev_job_id, None,
+                              (jqueue._JobDependencyManager.WAIT, "wait"))
+      elif attempt < 5:
+        depmgr.AddCheckResult(job, prev_job_id, None,
+                              (jqueue._JobDependencyManager.WAIT, "wait"))
+      elif attempt == 5:
+        depmgr.AddCheckResult(job, prev_job_id, None,
+                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
+      if attempt == 2:
+        self.assertEqual(depmgr.CountPendingResults(), 2)
+      elif attempt > 5:
+        self.assertEqual(depmgr.CountPendingResults(), 0)
+      else:
+        self.assertEqual(depmgr.CountPendingResults(), 1)
+
+      result = jqueue._JobProcessor(queue, opexec, job)()
+      if attempt == 0 or attempt >= 5:
+        # Job should only be updated if there was an actual change
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(depmgr.CountPendingResults())
+
+      if attempt < 5:
+        # Simulate waiting for other job
+        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
+        self.assertTrue(job.cur_opctx)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+        self.assertRaises(IndexError, depmgr.GetNextNotification)
+        self.assert_(job.start_timestamp)
+        self.assertFalse(job.end_timestamp)
+        continue
+
+      if result == jqueue._JobProcessor.FINISHED:
+        # Last opcode
+        self.assertFalse(job.cur_opctx)
+        break
+
+      self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assert_(job.start_timestamp)
+      self.assertFalse(job.end_timestamp)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
+                               for op in job.ops))
+
+    self._GenericCheckJob(job)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+    self.assertFalse(depmgr.CountPendingResults())
+    self.assertFalse(depmgr.CountWaitingJobs())
+
+    # Calling the processor on a finished job should be a no-op
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+  def testJobDependencyCancel(self):
+    depmgr = _FakeDependencyManager()
+    queue = _FakeQueueForProc(depmgr=depmgr)
+
+    self.assertEqual(queue.depmgr, depmgr)
+
+    prev_job_id = 13623
+    job_id = 30876
+    ops = [
+      opcodes.OpTestDummy(result="Res0", fail=False),
+      opcodes.OpTestDummy(result="Res1", fail=False,
+                          depends=[
+                            [prev_job_id, None],
+                            ]),
+      opcodes.OpTestDummy(result="Res2", fail=False),
+      ]
+
+    # Create job
+    job = self._CreateJob(queue, job_id, ops)
+
+    def _BeforeStart(timeout, priority):
+      if attempt == 0 or attempt > 5:
+        # Job should only be updated when it wasn't waiting for another job
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+      self.assertFalse(job.cur_opctx)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+      self.assertFalse(job.cur_opctx)
+
+      # Job is running, cancelling shouldn't be possible
+      (success, _) = job.Cancel()
+      self.assertFalse(success)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    counter = itertools.count()
+    while True:
+      attempt = counter.next()
+
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+      if attempt == 0:
+        # This will handle the first opcode
+        pass
+      elif attempt < 4:
+        depmgr.AddCheckResult(job, prev_job_id, None,
+                              (jqueue._JobDependencyManager.WAIT, "wait"))
+      elif attempt == 4:
+        # Other job was cancelled
+        depmgr.AddCheckResult(job, prev_job_id, None,
+                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
+
+      if attempt == 0:
+        self.assertEqual(depmgr.CountPendingResults(), 0)
+      else:
+        self.assertEqual(depmgr.CountPendingResults(), 1)
+
+      result = jqueue._JobProcessor(queue, opexec, job)()
+      if attempt <= 1 or attempt >= 4:
+        # Job should only be updated if there was an actual change
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(depmgr.CountPendingResults())
+
+      if attempt > 0 and attempt < 4:
+        # Simulate waiting for other job
+        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
+        self.assertTrue(job.cur_opctx)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+        self.assertRaises(IndexError, depmgr.GetNextNotification)
+        self.assert_(job.start_timestamp)
+        self.assertFalse(job.end_timestamp)
+        continue
+
+      if result == jqueue._JobProcessor.FINISHED:
+        # Last opcode
+        self.assertFalse(job.cur_opctx)
+        break
+
+      self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assert_(job.start_timestamp)
+      self.assertFalse(job.end_timestamp)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_CANCELED,
+                       constants.OP_STATUS_CANCELED],
+                      ["Res0", "Job canceled by request",
+                       "Job canceled by request"]])
+
+    self._GenericCheckJob(job)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+    self.assertFalse(depmgr.CountPendingResults())
+
+    # Calling the processor on a finished job should be a no-op
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+  def testJobDependencyWrongstatus(self):
+    depmgr = _FakeDependencyManager()
+    queue = _FakeQueueForProc(depmgr=depmgr)
+
+    self.assertEqual(queue.depmgr, depmgr)
+
+    prev_job_id = 9741
+    job_id = 11763
+    ops = [
+      opcodes.OpTestDummy(result="Res0", fail=False),
+      opcodes.OpTestDummy(result="Res1", fail=False,
+                          depends=[
+                            [prev_job_id, None],
+                            ]),
+      opcodes.OpTestDummy(result="Res2", fail=False),
+      ]
+
+    # Create job
+    job = self._CreateJob(queue, job_id, ops)
+
+    def _BeforeStart(timeout, priority):
+      if attempt == 0 or attempt > 5:
+        # Job should only be updated when it wasn't waiting for another job
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+      self.assertFalse(job.cur_opctx)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+      self.assertFalse(job.cur_opctx)
+
+      # Job is running, cancelling shouldn't be possible
+      (success, _) = job.Cancel()
+      self.assertFalse(success)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    counter = itertools.count()
+    while True:
+      attempt = counter.next()
+
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+      if attempt == 0:
+        # This will handle the first opcode
+        pass
+      elif attempt < 4:
+        depmgr.AddCheckResult(job, prev_job_id, None,
+                              (jqueue._JobDependencyManager.WAIT, "wait"))
+      elif attempt == 4:
+        # Other job failed
+        depmgr.AddCheckResult(job, prev_job_id, None,
+                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
+
+      if attempt == 0:
+        self.assertEqual(depmgr.CountPendingResults(), 0)
+      else:
+        self.assertEqual(depmgr.CountPendingResults(), 1)
+
+      result = jqueue._JobProcessor(queue, opexec, job)()
+      if attempt <= 1 or attempt >= 4:
+        # Job should only be updated if there was an actual change
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(depmgr.CountPendingResults())
+
+      if attempt > 0 and attempt < 4:
+        # Simulate waiting for other job
+        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
+        self.assertTrue(job.cur_opctx)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+        self.assertRaises(IndexError, depmgr.GetNextNotification)
+        self.assert_(job.start_timestamp)
+        self.assertFalse(job.end_timestamp)
+        continue
+
+      if result == jqueue._JobProcessor.FINISHED:
+        # Last opcode
+        self.assertFalse(job.cur_opctx)
+        break
+
+      self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assert_(job.start_timestamp)
+      self.assertFalse(job.end_timestamp)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_ERROR,
+                       constants.OP_STATUS_ERROR]]),
+
+    (opresult, ) = job.GetInfo(["opresult"])
+    self.assertEqual(len(opresult), len(ops))
+    self.assertEqual(opresult[0], "Res0")
+    self.assertTrue(errors.GetEncodedError(opresult[1]))
+    self.assertTrue(errors.GetEncodedError(opresult[2]))
+
+    self._GenericCheckJob(job)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+    self.assertFalse(depmgr.CountPendingResults())
+
+    # Calling the processor on a finished job should be a no-op
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
 
@@ -1220,7 +1665,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
     self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
     self.assertFalse(self.queue.IsAcquired())
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
     ts = self.timeout_strategy
 
@@ -1353,7 +1798,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
       result = proc(_nextop_fn=self._NextOpcode)
       assert self.curop is not None
 
-      if result or self.gave_lock:
+      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
         # Got lock and/or job is done, result must've been written
         self.assertFalse(job.cur_opctx)
         self.assertEqual(self.queue.GetNextUpdate(), (job, True))
@@ -1361,11 +1806,11 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
         self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
         self.assert_(job.ops[self.curop].exec_timestamp)
 
-      if result:
+      if result == jqueue._JobProcessor.FINISHED:
         self.assertFalse(job.cur_opctx)
         break
 
-      self.assertFalse(result)
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
       if self.curop == 0:
         self.assertEqual(job.ops[self.curop].start_timestamp,
@@ -1380,7 +1825,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
                          self.timeout_strategy.NextAttempt)
         self.assertFalse(job.ops[self.curop].exec_timestamp)
-        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
         # If priority has changed since acquiring locks, the job must've been
         # updated
@@ -1408,9 +1853,307 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
                             for op in job.ops))
 
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
 
+class TestJobDependencyManager(unittest.TestCase):
+  class _FakeJob:
+    def __init__(self, job_id):
+      self.id = str(job_id)
+
+  def setUp(self):
+    self._status = []
+    self._queue = []
+    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
+
+  def _GetStatus(self, job_id):
+    (exp_job_id, result) = self._status.pop(0)
+    self.assertEqual(exp_job_id, job_id)
+    return result
+
+  def _Enqueue(self, jobs):
+    self.assertFalse(self.jdm._lock.is_owned(),
+                     msg=("Must not own manager lock while re-adding jobs"
+                          " (potential deadlock)"))
+    self._queue.append(jobs)
+
+  def testNotFinalizedThenCancel(self):
+    job = self._FakeJob(17697)
+    job_id = str(28625)
+
+    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
+    self.assertEqual(result, self.jdm.WAIT)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertTrue(self.jdm.JobWaiting(job))
+    self.assertEqual(self.jdm._waiters, {
+      job_id: set([job]),
+      })
+    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+      ("job/28625", None, None, [("job", [job.id])])
+      ])
+
+    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
+    self.assertEqual(result, self.jdm.CANCEL)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertFalse(self.jdm.JobWaiting(job))
+    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+  def testRequireCancel(self):
+    job = self._FakeJob(5278)
+    job_id = str(9610)
+    dep_status = [constants.JOB_STATUS_CANCELED]
+
+    self._status.append((job_id, constants.JOB_STATUS_WAITING))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+    self.assertEqual(result, self.jdm.WAIT)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertTrue(self.jdm.JobWaiting(job))
+    self.assertEqual(self.jdm._waiters, {
+      job_id: set([job]),
+      })
+    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+      ("job/9610", None, None, [("job", [job.id])])
+      ])
+
+    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+    self.assertEqual(result, self.jdm.CONTINUE)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertFalse(self.jdm.JobWaiting(job))
+    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+  def testRequireError(self):
+    job = self._FakeJob(21459)
+    job_id = str(25519)
+    dep_status = [constants.JOB_STATUS_ERROR]
+
+    self._status.append((job_id, constants.JOB_STATUS_WAITING))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+    self.assertEqual(result, self.jdm.WAIT)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertTrue(self.jdm.JobWaiting(job))
+    self.assertEqual(self.jdm._waiters, {
+      job_id: set([job]),
+      })
+
+    self._status.append((job_id, constants.JOB_STATUS_ERROR))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+    self.assertEqual(result, self.jdm.CONTINUE)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertFalse(self.jdm.JobWaiting(job))
+    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+  def testRequireMultiple(self):
+    dep_status = list(constants.JOBS_FINALIZED)
+
+    for end_status in dep_status:
+      job = self._FakeJob(21343)
+      job_id = str(14609)
+
+      self._status.append((job_id, constants.JOB_STATUS_WAITING))
+      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+      self.assertEqual(result, self.jdm.WAIT)
+      self.assertFalse(self._status)
+      self.assertFalse(self._queue)
+      self.assertTrue(self.jdm.JobWaiting(job))
+      self.assertEqual(self.jdm._waiters, {
+        job_id: set([job]),
+        })
+      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+        ("job/14609", None, None, [("job", [job.id])])
+        ])
+
+      self._status.append((job_id, end_status))
+      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+      self.assertEqual(result, self.jdm.CONTINUE)
+      self.assertFalse(self._status)
+      self.assertFalse(self._queue)
+      self.assertFalse(self.jdm.JobWaiting(job))
+      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+  def testNotify(self):
+    job = self._FakeJob(8227)
+    job_id = str(4113)
+
+    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
+    self.assertEqual(result, self.jdm.WAIT)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertTrue(self.jdm.JobWaiting(job))
+    self.assertEqual(self.jdm._waiters, {
+      job_id: set([job]),
+      })
+
+    self.jdm.NotifyWaiters(job_id)
+    self.assertFalse(self._status)
+    self.assertFalse(self.jdm._waiters)
+    self.assertFalse(self.jdm.JobWaiting(job))
+    self.assertEqual(self._queue, [set([job])])
+
+  def testWrongStatus(self):
+    job = self._FakeJob(10102)
+    job_id = str(1271)
+
+    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id,
+                                            [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(result, self.jdm.WAIT)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertTrue(self.jdm.JobWaiting(job))
+    self.assertEqual(self.jdm._waiters, {
+      job_id: set([job]),
+      })
+
+    self._status.append((job_id, constants.JOB_STATUS_ERROR))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id,
+                                            [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(result, self.jdm.WRONGSTATUS)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertFalse(self.jdm.JobWaiting(job))
+
+  def testCorrectStatus(self):
+    job = self._FakeJob(24273)
+    job_id = str(23885)
+
+    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id,
+                                            [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(result, self.jdm.WAIT)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertTrue(self.jdm.JobWaiting(job))
+    self.assertEqual(self.jdm._waiters, {
+      job_id: set([job]),
+      })
+
+    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id,
+                                            [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(result, self.jdm.CONTINUE)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertFalse(self.jdm.JobWaiting(job))
+
+  def testFinalizedRightAway(self):
+    job = self._FakeJob(224)
+    job_id = str(3081)
+
+    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
+    (result, _) = self.jdm.CheckAndRegister(job, job_id,
+                                            [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(result, self.jdm.CONTINUE)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+    self.assertFalse(self.jdm.JobWaiting(job))
+    self.assertEqual(self.jdm._waiters, {
+      job_id: set(),
+      })
+
+    # Force cleanup
+    self.jdm.NotifyWaiters("0")
+    self.assertFalse(self.jdm._waiters)
+    self.assertFalse(self._status)
+    self.assertFalse(self._queue)
+
+  def testMultipleWaiting(self):
+    # Use a deterministic random generator
+    rnd = random.Random(21402)
+
+    job_ids = map(str, rnd.sample(range(1, 10000), 150))
+
+    waiters = dict((job_ids.pop(),
+                    set(map(self._FakeJob,
+                            [job_ids.pop()
+                             for _ in range(rnd.randint(1, 20))])))
+                   for _ in range(10))
+
+    # Ensure there are no duplicate job IDs
+    assert not utils.FindDuplicates(waiters.keys() +
+                                    [job.id
+                                     for jobs in waiters.values()
+                                     for job in jobs])
+
+    # Register all jobs as waiters
+    for job_id, job in [(job_id, job)
+                        for (job_id, jobs) in waiters.items()
+                        for job in jobs]:
+      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
+      (result, _) = self.jdm.CheckAndRegister(job, job_id,
+                                              [constants.JOB_STATUS_SUCCESS])
+      self.assertEqual(result, self.jdm.WAIT)
+      self.assertFalse(self._status)
+      self.assertFalse(self._queue)
+      self.assertTrue(self.jdm.JobWaiting(job))
+
+    self.assertEqual(self.jdm._waiters, waiters)
+
+    def _MakeSet((name, mode, owner_names, pending)):
+      return (name, mode, owner_names,
+              [(pendmode, set(pend)) for (pendmode, pend) in pending])
+
+    def _CheckLockInfo():
+      info = self.jdm.GetLockInfo([query.LQ_PENDING])
+      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
+        ("job/%s" % job_id, None, None,
+         [("job", set([job.id for job in jobs]))])
+        for job_id, jobs in waiters.items()
+        if jobs
+        ]))
+
+    _CheckLockInfo()
+
+    # Notify in random order
+    for job_id in rnd.sample(waiters, len(waiters)):
+      # Remove from pending waiter list
+      jobs = waiters.pop(job_id)
+      for job in jobs:
+        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
+        (result, _) = self.jdm.CheckAndRegister(job, job_id,
+                                                [constants.JOB_STATUS_SUCCESS])
+        self.assertEqual(result, self.jdm.CONTINUE)
+        self.assertFalse(self._status)
+        self.assertFalse(self._queue)
+        self.assertFalse(self.jdm.JobWaiting(job))
+
+      _CheckLockInfo()
+
+    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+    assert not waiters
+
+  def testSelfDependency(self):
+    job = self._FakeJob(18937)
+
+    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
+    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
+    self.assertEqual(result, self.jdm.ERROR)
+
+  def testJobDisappears(self):
+    job = self._FakeJob(30540)
+    job_id = str(23769)
+
+    def _FakeStatus(_):
+      raise errors.JobLost("#msg#")
+
+    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
+    (result, _) = jdm.CheckAndRegister(job, job_id, [])
+    self.assertEqual(result, self.jdm.ERROR)
+    self.assertFalse(jdm.JobWaiting(job))
+    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()