Merge branch 'devel-2.5'
[ganeti-local] / test / ganeti.jqueue_unittest.py
index 87481f3..45c2984 100755 (executable)
@@ -28,6 +28,7 @@ import tempfile
 import shutil
 import errno
 import itertools
+import random
 
 from ganeti import constants
 from ganeti import utils
@@ -36,6 +37,7 @@ from ganeti import jqueue
 from ganeti import opcodes
 from ganeti import compat
 from ganeti import mcpu
+from ganeti import query
 
 import testutils
 
@@ -201,7 +203,7 @@ class TestWaitForJobChangesHelper(unittest.TestCase):
     shutil.rmtree(self.tmpdir)
 
   def _LoadWaitingJob(self):
-    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
+    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
 
   def _LoadLostJob(self):
     return None
@@ -211,13 +213,13 @@ class TestWaitForJobChangesHelper(unittest.TestCase):
 
     # No change
     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
-                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
+                          [constants.JOB_STATUS_WAITING], None, 0.1),
                      constants.JOB_NOTCHANGED)
 
     # No previous information
     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
                           ["status"], None, None, 1.0),
-                     ([constants.JOB_STATUS_WAITLOCK], []))
+                     ([constants.JOB_STATUS_WAITING], []))
 
   def testLostJob(self):
     wfjc = jqueue._WaitForJobChangesHelper()
@@ -364,12 +366,12 @@ class TestQueuedJob(unittest.TestCase):
                               for op in ops))
 
     def _Waitlock1(ops):
-      ops[0].status = constants.OP_STATUS_WAITLOCK
+      ops[0].status = constants.OP_STATUS_WAITING
 
     def _Waitlock2(ops):
       ops[0].status = constants.OP_STATUS_SUCCESS
       ops[1].status = constants.OP_STATUS_SUCCESS
-      ops[2].status = constants.OP_STATUS_WAITLOCK
+      ops[2].status = constants.OP_STATUS_WAITING
 
     def _Running(ops):
       ops[0].status = constants.OP_STATUS_SUCCESS
@@ -408,7 +410,7 @@ class TestQueuedJob(unittest.TestCase):
 
     tests = {
       constants.JOB_STATUS_QUEUED: [_Queued],
-      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
+      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
       constants.JOB_STATUS_RUNNING: [_Running],
       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
       constants.JOB_STATUS_CANCELED: [_Canceled],
@@ -602,7 +604,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
         self.assertEqual(queue.GetNextUpdate(), (job, True))
         self.assertRaises(IndexError, queue.GetNextUpdate)
         self.assertFalse(queue.IsAcquired())
-        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
         self.assertFalse(job.cur_opctx)
 
       def _AfterStart(op, cbs):
@@ -626,9 +628,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
         self.assertRaises(IndexError, queue.GetNextUpdate)
         if idx == len(ops) - 1:
           # Last opcode
-          self.assert_(result)
+          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
         else:
-          self.assertFalse(result)
+          self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
           self.assert_(job.start_timestamp)
@@ -648,7 +650,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self._GenericCheckJob(job)
 
       # Calling the processor on a finished job should be a no-op
-      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                       jqueue._JobProcessor.FINISHED)
       self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testOpcodeError(self):
@@ -687,10 +690,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
         if idx in (failfrom, len(ops) - 1):
           # Last opcode
-          self.assert_(result)
+          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
           break
 
-        self.assertFalse(result)
+        self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
@@ -726,7 +729,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self._GenericCheckJob(job)
 
       # Calling the processor on a finished job should be a no-op
-      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                       jqueue._JobProcessor.FINISHED)
       self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testCancelWhileInQueue(self):
@@ -757,7 +761,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     # Simulate processor called in workerpool
     opexec = _FakeExecOpCodeForProc(queue, None, None)
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -786,11 +791,11 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
-    job.ops[0].status = constants.OP_STATUS_WAITLOCK
+    job.ops[0].status = constants.OP_STATUS_WAITING
 
     assert len(job.ops) == 5
 
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
     # Mark as cancelling
     (success, _) = job.Cancel()
@@ -802,7 +807,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
                             for op in job.ops))
 
     opexec = _FakeExecOpCodeForProc(queue, None, None)
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -831,7 +837,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
       # Mark as cancelled
       (success, _) = job.Cancel()
@@ -850,7 +856,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
 
     self.assertRaises(IndexError, queue.GetNextUpdate)
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertEqual(queue.GetNextUpdate(), (job, True))
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
@@ -879,7 +886,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     def _BeforeStart(timeout, priority):
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
       # Mark as cancelled
       (success, _) = job.Cancel()
@@ -896,7 +903,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
 
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -925,7 +933,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     # Run one opcode
-    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.DEFER)
 
     # Job goes back to queued
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
@@ -940,7 +949,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assert_(success)
 
     # Try processing another opcode (this will actually cancel the job)
-    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -970,7 +980,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
       for _ in range(successcount):
-        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                         jqueue._JobProcessor.DEFER)
 
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       self.assertEqual(job.GetInfo(["opstatus"]),
@@ -1002,10 +1013,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
       if remaining == 0:
         # Last opcode
-        self.assert_(result)
+        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
         break
 
-      self.assertFalse(result)
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
@@ -1022,13 +1033,15 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self._GenericCheckJob(job)
 
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
     # ... also after being restored
     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testProcessorOnRunningJob(self):
@@ -1078,7 +1091,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
     def _AfterStart(op, cbs):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
@@ -1109,10 +1122,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
       if remaining == 0:
         # Last opcode
-        self.assert_(result)
+        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
         break
 
-      self.assertFalse(result)
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
@@ -1193,7 +1206,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
       self.assertFalse(job.cur_opctx)
 
     def _AfterStart(op, cbs):
@@ -1217,9 +1230,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assertRaises(IndexError, queue.GetNextUpdate)
       if idx == len(ops) - 1:
         # Last opcode
-        self.assert_(result)
+        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
       else:
-        self.assertFalse(result)
+        self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
         self.assert_(job.start_timestamp)
@@ -1244,7 +1257,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self._GenericCheckJob(job)
 
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testJobDependency(self):
@@ -1274,7 +1288,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
         self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
       self.assertFalse(job.cur_opctx)
 
     def _AfterStart(op, cbs):
@@ -1331,23 +1345,22 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
       if attempt < 5:
         # Simulate waiting for other job
-        self.assertTrue(result)
+        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
         self.assertTrue(job.cur_opctx)
-        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
         self.assertRaises(IndexError, depmgr.GetNextNotification)
         self.assert_(job.start_timestamp)
         self.assertFalse(job.end_timestamp)
         continue
 
-      if result:
+      if result == jqueue._JobProcessor.FINISHED:
         # Last opcode
         self.assertFalse(job.cur_opctx)
-        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
         break
 
       self.assertRaises(IndexError, depmgr.GetNextNotification)
 
-      self.assertFalse(result)
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       self.assert_(job.start_timestamp)
       self.assertFalse(job.end_timestamp)
@@ -1369,7 +1382,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertFalse(depmgr.CountWaitingJobs())
 
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testJobDependencyCancel(self):
@@ -1398,7 +1412,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
         self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
       self.assertFalse(job.cur_opctx)
 
     def _AfterStart(op, cbs):
@@ -1449,23 +1463,22 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
       if attempt > 0 and attempt < 4:
         # Simulate waiting for other job
-        self.assertTrue(result)
+        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
         self.assertTrue(job.cur_opctx)
-        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
         self.assertRaises(IndexError, depmgr.GetNextNotification)
         self.assert_(job.start_timestamp)
         self.assertFalse(job.end_timestamp)
         continue
 
-      if result:
+      if result == jqueue._JobProcessor.FINISHED:
         # Last opcode
         self.assertFalse(job.cur_opctx)
-        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
         break
 
       self.assertRaises(IndexError, depmgr.GetNextNotification)
 
-      self.assertFalse(result)
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       self.assert_(job.start_timestamp)
       self.assertFalse(job.end_timestamp)
@@ -1486,7 +1499,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertFalse(depmgr.CountPendingResults())
 
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testJobDependencyWrongstatus(self):
@@ -1515,7 +1529,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
         self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
       self.assertFalse(job.cur_opctx)
 
     def _AfterStart(op, cbs):
@@ -1566,23 +1580,22 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
       if attempt > 0 and attempt < 4:
         # Simulate waiting for other job
-        self.assertTrue(result)
+        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
         self.assertTrue(job.cur_opctx)
-        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
         self.assertRaises(IndexError, depmgr.GetNextNotification)
         self.assert_(job.start_timestamp)
         self.assertFalse(job.end_timestamp)
         continue
 
-      if result:
+      if result == jqueue._JobProcessor.FINISHED:
         # Last opcode
         self.assertFalse(job.cur_opctx)
-        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
         break
 
       self.assertRaises(IndexError, depmgr.GetNextNotification)
 
-      self.assertFalse(result)
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       self.assert_(job.start_timestamp)
       self.assertFalse(job.end_timestamp)
@@ -1607,7 +1620,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertFalse(depmgr.CountPendingResults())
 
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
 
@@ -1651,7 +1665,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
     self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
     self.assertFalse(self.queue.IsAcquired())
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
     ts = self.timeout_strategy
 
@@ -1784,7 +1798,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
       result = proc(_nextop_fn=self._NextOpcode)
       assert self.curop is not None
 
-      if result or self.gave_lock:
+      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
         # Got lock and/or job is done, result must've been written
         self.assertFalse(job.cur_opctx)
         self.assertEqual(self.queue.GetNextUpdate(), (job, True))
@@ -1792,11 +1806,11 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
         self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
         self.assert_(job.ops[self.curop].exec_timestamp)
 
-      if result:
+      if result == jqueue._JobProcessor.FINISHED:
         self.assertFalse(job.cur_opctx)
         break
 
-      self.assertFalse(result)
+      self.assertEqual(result, jqueue._JobProcessor.DEFER)
 
       if self.curop == 0:
         self.assertEqual(job.ops[self.curop].start_timestamp,
@@ -1811,7 +1825,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
                          self.timeout_strategy.NextAttempt)
         self.assertFalse(job.ops[self.curop].exec_timestamp)
-        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
         # If priority has changed since acquiring locks, the job must've been
         # updated
@@ -1839,7 +1853,8 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
                             for op in job.ops))
 
     # Calling the processor on a finished job should be a no-op
-    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
+    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
     self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
 
@@ -1874,6 +1889,9 @@ class TestJobDependencyManager(unittest.TestCase):
     self.assertEqual(self.jdm._waiters, {
       job_id: set([job]),
       })
+    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+      ("job/28625", None, None, [("job", [job.id])])
+      ])
 
     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
@@ -1881,13 +1899,14 @@ class TestJobDependencyManager(unittest.TestCase):
     self.assertFalse(self._status)
     self.assertFalse(self._queue)
     self.assertFalse(self.jdm.JobWaiting(job))
+    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
 
   def testRequireCancel(self):
     job = self._FakeJob(5278)
     job_id = str(9610)
     dep_status = [constants.JOB_STATUS_CANCELED]
 
-    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
+    self._status.append((job_id, constants.JOB_STATUS_WAITING))
     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
     self.assertEqual(result, self.jdm.WAIT)
     self.assertFalse(self._status)
@@ -1896,6 +1915,9 @@ class TestJobDependencyManager(unittest.TestCase):
     self.assertEqual(self.jdm._waiters, {
       job_id: set([job]),
       })
+    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+      ("job/9610", None, None, [("job", [job.id])])
+      ])
 
     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
@@ -1903,13 +1925,14 @@ class TestJobDependencyManager(unittest.TestCase):
     self.assertFalse(self._status)
     self.assertFalse(self._queue)
     self.assertFalse(self.jdm.JobWaiting(job))
+    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
 
   def testRequireError(self):
     job = self._FakeJob(21459)
     job_id = str(25519)
     dep_status = [constants.JOB_STATUS_ERROR]
 
-    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
+    self._status.append((job_id, constants.JOB_STATUS_WAITING))
     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
     self.assertEqual(result, self.jdm.WAIT)
     self.assertFalse(self._status)
@@ -1925,6 +1948,7 @@ class TestJobDependencyManager(unittest.TestCase):
     self.assertFalse(self._status)
     self.assertFalse(self._queue)
     self.assertFalse(self.jdm.JobWaiting(job))
+    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
 
   def testRequireMultiple(self):
     dep_status = list(constants.JOBS_FINALIZED)
@@ -1933,7 +1957,7 @@ class TestJobDependencyManager(unittest.TestCase):
       job = self._FakeJob(21343)
       job_id = str(14609)
 
-      self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
+      self._status.append((job_id, constants.JOB_STATUS_WAITING))
       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
       self.assertEqual(result, self.jdm.WAIT)
       self.assertFalse(self._status)
@@ -1942,6 +1966,9 @@ class TestJobDependencyManager(unittest.TestCase):
       self.assertEqual(self.jdm._waiters, {
         job_id: set([job]),
         })
+      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+        ("job/14609", None, None, [("job", [job.id])])
+        ])
 
       self._status.append((job_id, end_status))
       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
@@ -1949,6 +1976,7 @@ class TestJobDependencyManager(unittest.TestCase):
       self.assertFalse(self._status)
       self.assertFalse(self._queue)
       self.assertFalse(self.jdm.JobWaiting(job))
+      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
 
   def testNotify(self):
     job = self._FakeJob(8227)
@@ -2037,6 +2065,72 @@ class TestJobDependencyManager(unittest.TestCase):
     self.assertFalse(self._status)
     self.assertFalse(self._queue)
 
+  def testMultipleWaiting(self):
+    # Use a deterministic random generator
+    rnd = random.Random(21402)
+
+    job_ids = map(str, rnd.sample(range(1, 10000), 150))
+
+    waiters = dict((job_ids.pop(),
+                    set(map(self._FakeJob,
+                            [job_ids.pop()
+                             for _ in range(rnd.randint(1, 20))])))
+                   for _ in range(10))
+
+    # Ensure there are no duplicate job IDs
+    assert not utils.FindDuplicates(waiters.keys() +
+                                    [job.id
+                                     for jobs in waiters.values()
+                                     for job in jobs])
+
+    # Register all jobs as waiters
+    for job_id, job in [(job_id, job)
+                        for (job_id, jobs) in waiters.items()
+                        for job in jobs]:
+      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
+      (result, _) = self.jdm.CheckAndRegister(job, job_id,
+                                              [constants.JOB_STATUS_SUCCESS])
+      self.assertEqual(result, self.jdm.WAIT)
+      self.assertFalse(self._status)
+      self.assertFalse(self._queue)
+      self.assertTrue(self.jdm.JobWaiting(job))
+
+    self.assertEqual(self.jdm._waiters, waiters)
+
+    def _MakeSet((name, mode, owner_names, pending)):
+      return (name, mode, owner_names,
+              [(pendmode, set(pend)) for (pendmode, pend) in pending])
+
+    def _CheckLockInfo():
+      info = self.jdm.GetLockInfo([query.LQ_PENDING])
+      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
+        ("job/%s" % job_id, None, None,
+         [("job", set([job.id for job in jobs]))])
+        for job_id, jobs in waiters.items()
+        if jobs
+        ]))
+
+    _CheckLockInfo()
+
+    # Notify in random order
+    for job_id in rnd.sample(waiters, len(waiters)):
+      # Remove from pending waiter list
+      jobs = waiters.pop(job_id)
+      for job in jobs:
+        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
+        (result, _) = self.jdm.CheckAndRegister(job, job_id,
+                                                [constants.JOB_STATUS_SUCCESS])
+        self.assertEqual(result, self.jdm.CONTINUE)
+        self.assertFalse(self._status)
+        self.assertFalse(self._queue)
+        self.assertFalse(self.jdm.JobWaiting(job))
+
+      _CheckLockInfo()
+
+    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+    assert not waiters
+
   def testSelfDependency(self):
     job = self._FakeJob(18937)
 
@@ -2055,6 +2149,7 @@ class TestJobDependencyManager(unittest.TestCase):
     (result, _) = jdm.CheckAndRegister(job, job_id, [])
     self.assertEqual(result, self.jdm.ERROR)
     self.assertFalse(jdm.JobWaiting(job))
+    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
 
 
 if __name__ == "__main__":