Add unittest for cli.FormatResultError
[ganeti-local] / test / ganeti.jqueue_unittest.py
index d573378..9137927 100755 (executable)
@@ -27,6 +27,7 @@ import unittest
 import tempfile
 import shutil
 import errno
+import itertools
 
 from ganeti import constants
 from ganeti import utils
@@ -34,6 +35,7 @@ from ganeti import errors
 from ganeti import jqueue
 from ganeti import opcodes
 from ganeti import compat
+from ganeti import mcpu
 
 import testutils
 
@@ -266,7 +268,7 @@ class TestQueuedOpCode(unittest.TestCase):
       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
 
-    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
+    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
     op1 = jqueue._QueuedOpCode(inpop)
     _Check(op1)
     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
@@ -282,7 +284,7 @@ class TestQueuedJob(unittest.TestCase):
   def testDefaults(self):
     job_id = 4260
     ops = [
-      opcodes.OpGetTags(),
+      opcodes.OpTagsGet(),
       opcodes.OpTestDelay(),
       ]
 
@@ -312,7 +314,7 @@ class TestQueuedJob(unittest.TestCase):
   def testPriority(self):
     job_id = 4283
     ops = [
-      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
+      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
       opcodes.OpTestDelay(),
       ]
 
@@ -425,10 +427,14 @@ class TestQueuedJob(unittest.TestCase):
 class _FakeQueueForProc:
   def __init__(self):
     self._acquired = False
+    self._updates = []
 
   def IsAcquired(self):
     return self._acquired
 
+  def GetNextUpdate(self):
+    return self._updates.pop(0)
+
   def acquire(self, shared=0):
     assert shared == 1
     self._acquired = True
@@ -437,34 +443,40 @@ class _FakeQueueForProc:
     assert self._acquired
     self._acquired = False
 
-  def UpdateJobUnlocked(self, job, replicate=None):
-    # TODO: Ensure job is updated at the correct places
-    pass
+  def UpdateJobUnlocked(self, job, replicate=True):
+    assert self._acquired, "Lock not acquired while updating job"
+    self._updates.append((job, bool(replicate)))
 
 
 class _FakeExecOpCodeForProc:
-  def __init__(self, before_start, after_start):
+  def __init__(self, queue, before_start, after_start):
+    self._queue = queue
     self._before_start = before_start
     self._after_start = after_start
 
-  def __call__(self, op, cbs):
+  def __call__(self, op, cbs, timeout=None, priority=None):
     assert isinstance(op, opcodes.OpTestDummy)
+    assert not self._queue.IsAcquired(), \
+           "Queue lock not released when executing opcode"
 
     if self._before_start:
-      self._before_start()
+      self._before_start(timeout, priority)
 
     cbs.NotifyStart()
 
     if self._after_start:
       self._after_start(op, cbs)
 
+    # Check again after the callbacks
+    assert not self._queue.IsAcquired()
+
     if op.fail:
       raise errors.OpExecError("Error requested (%s)" % op.result)
 
     return op.result
 
 
-class TestJobProcessor(unittest.TestCase):
+class _JobProcessorTestUtils:
   def _CreateJob(self, queue, job_id, ops):
     job = jqueue._QueuedJob(queue, job_id, ops)
     self.assertFalse(job.start_timestamp)
@@ -475,6 +487,8 @@ class TestJobProcessor(unittest.TestCase):
     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
     return job
 
+
+class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
   def _GenericCheckJob(self, job):
     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
                       for op in job.ops)
@@ -502,22 +516,32 @@ class TestJobProcessor(unittest.TestCase):
       # Create job
       job = self._CreateJob(queue, job_id, ops)
 
-      def _BeforeStart():
+      def _BeforeStart(timeout, priority):
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertFalse(job.cur_opctx)
 
       def _AfterStart(op, cbs):
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+
         self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+        self.assertFalse(job.cur_opctx)
 
         # Job is running, cancelling shouldn't be possible
         (success, _) = job.Cancel()
         self.assertFalse(success)
 
-      opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
 
       for idx in range(len(ops)):
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         result = jqueue._JobProcessor(queue, opexec, job)()
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         if idx == len(ops) - 1:
           # Last opcode
           self.assert_(result)
@@ -528,6 +552,8 @@ class TestJobProcessor(unittest.TestCase):
           self.assert_(job.start_timestamp)
           self.assertFalse(job.end_timestamp)
 
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
       self.assertEqual(job.GetInfo(["opresult"]),
@@ -564,10 +590,18 @@ class TestJobProcessor(unittest.TestCase):
       # Create job
       job = self._CreateJob(queue, job_id, ops)
 
-      opexec = _FakeExecOpCodeForProc(None, None)
+      opexec = _FakeExecOpCodeForProc(queue, None, None)
 
       for idx in range(len(ops)):
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         result = jqueue._JobProcessor(queue, opexec, job)()
+        # queued to waitlock
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        # waitlock to running
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        # Opcode result
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
 
         if idx in (failfrom, len(ops) - 1):
           # Last opcode
@@ -578,6 +612,8 @@ class TestJobProcessor(unittest.TestCase):
 
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
       # Check job status
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
       self.assertEqual(job.GetInfo(["id"]), [job_id])
@@ -627,11 +663,54 @@ class TestJobProcessor(unittest.TestCase):
     (success, _) = job.Cancel()
     self.assert_(success)
 
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
                             for op in job.ops))
 
-    opexec = _FakeExecOpCodeForProc(None, None)
-    jqueue._JobProcessor(queue, opexec, job)()
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertFalse(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlockInQueue(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 8645
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    job.ops[0].status = constants.OP_STATUS_WAITLOCK
+
+    assert len(job.ops) == 5
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    # Mark as cancelling
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                            for op in job.ops))
+
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -656,7 +735,9 @@ class TestJobProcessor(unittest.TestCase):
 
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
-    def _BeforeStart():
+    def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -666,14 +747,64 @@ class TestJobProcessor(unittest.TestCase):
 
       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
                               for op in job.ops))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
 
     def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
 
-    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
 
-    jqueue._JobProcessor(queue, opexec, job)()
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlockWithTimeout(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 24314
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    def _BeforeStart(timeout, priority):
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+      # Mark as cancelled
+      (success, _) = job.Cancel()
+      self.assert_(success)
+
+      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                              for op in job.ops))
+
+      # Fake an acquire attempt timing out
+      raise mcpu.LockAcquireTimeout()
+
+    def _AfterStart(op, cbs):
+      self.fail("Should not reach this")
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -699,7 +830,7 @@ class TestJobProcessor(unittest.TestCase):
 
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
-    opexec = _FakeExecOpCodeForProc(None, None)
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     # Run one opcode
     self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
@@ -735,7 +866,7 @@ class TestJobProcessor(unittest.TestCase):
     # program was restarted
     queue = _FakeQueueForProc()
 
-    opexec = _FakeExecOpCodeForProc(None, None)
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
@@ -768,7 +899,7 @@ class TestJobProcessor(unittest.TestCase):
     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
 
     queue = _FakeQueueForProc()
-    opexec = _FakeExecOpCodeForProc(None, None)
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     for remaining in reversed(range(len(job.ops) - successcount)):
       result = jqueue._JobProcessor(queue, opexec, job)()
@@ -806,7 +937,7 @@ class TestJobProcessor(unittest.TestCase):
     ops = [opcodes.OpTestDummy(result="result", fail=False)]
 
     queue = _FakeQueueForProc()
-    opexec = _FakeExecOpCodeForProc(None, None)
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     # Create job
     job = self._CreateJob(queue, 9571, ops)
@@ -845,11 +976,15 @@ class TestJobProcessor(unittest.TestCase):
     # Create job
     job = self._CreateJob(queue, 29386, ops)
 
-    def _BeforeStart():
+    def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
     def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
 
@@ -857,15 +992,22 @@ class TestJobProcessor(unittest.TestCase):
                         "too", "many", "arguments")
 
       for (log_type, msg) in op.messages:
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         if log_type:
           cbs.Feedback(log_type, msg)
         else:
           cbs.Feedback(msg)
+        # Check for job update without replication
+        self.assertEqual(queue.GetNextUpdate(), (job, False))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
 
-    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
 
     for remaining in reversed(range(len(job.ops))):
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       result = jqueue._JobProcessor(queue, opexec, job)()
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
 
       if remaining == 0:
         # Last opcode
@@ -876,6 +1018,8 @@ class TestJobProcessor(unittest.TestCase):
 
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
     self.assertEqual(job.GetInfo(["opresult"]),
                      [[op.input.result for op in job.ops]])
@@ -922,5 +1066,237 @@ class TestJobProcessor(unittest.TestCase):
     self.assertFalse(job.GetLogEntries(count + 3))
 
 
+class _FakeTimeoutStrategy:
+  def __init__(self, timeouts):
+    self.timeouts = timeouts
+    self.attempts = 0
+    self.last_timeout = None
+
+  def NextAttempt(self):
+    self.attempts += 1
+    if self.timeouts:
+      timeout = self.timeouts.pop(0)
+    else:
+      timeout = None
+    self.last_timeout = timeout
+    return timeout
+
+
+class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
+  def setUp(self):
+    self.queue = _FakeQueueForProc()
+    self.job = None
+    self.curop = None
+    self.opcounter = None
+    self.timeout_strategy = None
+    self.retries = 0
+    self.prev_tsop = None
+    self.prev_prio = None
+    self.prev_status = None
+    self.lock_acq_prio = None
+    self.gave_lock = None
+    self.done_lock_before_blocking = False
+
+  def _BeforeStart(self, timeout, priority):
+    job = self.job
+
+    # If status has changed, job must've been written
+    if self.prev_status != self.job.ops[self.curop].status:
+      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+    self.assertFalse(self.queue.IsAcquired())
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    ts = self.timeout_strategy
+
+    self.assert_(timeout is None or isinstance(timeout, (int, float)))
+    self.assertEqual(timeout, ts.last_timeout)
+    self.assertEqual(priority, job.ops[self.curop].priority)
+
+    self.gave_lock = True
+    self.lock_acq_prio = priority
+
+    if (self.curop == 3 and
+        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
+      # Give locks before running into blocking acquire
+      assert self.retries == 7
+      self.retries = 0
+      self.done_lock_before_blocking = True
+      return
+
+    if self.retries > 0:
+      self.assert_(timeout is not None)
+      self.retries -= 1
+      self.gave_lock = False
+      raise mcpu.LockAcquireTimeout()
+
+    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
+      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
+      assert not ts.timeouts
+      self.assert_(timeout is None)
+
+  def _AfterStart(self, op, cbs):
+    job = self.job
+
+    # Setting to "running" requires an update
+    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+    self.assertFalse(self.queue.IsAcquired())
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    # Job is running, cancelling shouldn't be possible
+    (success, _) = job.Cancel()
+    self.assertFalse(success)
+
+  def _NextOpcode(self):
+    self.curop = self.opcounter.next()
+    self.prev_prio = self.job.ops[self.curop].priority
+    self.prev_status = self.job.ops[self.curop].status
+
+  def _NewTimeoutStrategy(self):
+    job = self.job
+
+    self.assertEqual(self.retries, 0)
+
+    if self.prev_tsop == self.curop:
+      # Still on the same opcode, priority must've been increased
+      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
+
+    if self.curop == 1:
+      # Normal retry
+      timeouts = range(10, 31, 10)
+      self.retries = len(timeouts) - 1
+
+    elif self.curop == 2:
+      # Let this run into a blocking acquire
+      timeouts = range(11, 61, 12)
+      self.retries = len(timeouts)
+
+    elif self.curop == 3:
+      # Wait for priority to increase, but give lock before blocking acquire
+      timeouts = range(12, 100, 14)
+      self.retries = len(timeouts)
+
+      self.assertFalse(self.done_lock_before_blocking)
+
+    elif self.curop == 4:
+      self.assert_(self.done_lock_before_blocking)
+
+      # Timeouts, but no need to retry
+      timeouts = range(10, 31, 10)
+      self.retries = 0
+
+    elif self.curop == 5:
+      # Normal retry
+      timeouts = range(19, 100, 11)
+      self.retries = len(timeouts)
+
+    else:
+      timeouts = []
+      self.retries = 0
+
+    assert len(job.ops) == 10
+    assert self.retries <= len(timeouts)
+
+    ts = _FakeTimeoutStrategy(timeouts)
+
+    self.timeout_strategy = ts
+    self.prev_tsop = self.curop
+    self.prev_prio = job.ops[self.curop].priority
+
+    return ts
+
+  def testTimeout(self):
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(10)]
+
+    # Create job
+    job_id = 15801
+    job = self._CreateJob(self.queue, job_id, ops)
+    self.job = job
+
+    self.opcounter = itertools.count(0)
+
+    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
+                                    self._AfterStart)
+    tsf = self._NewTimeoutStrategy
+
+    self.assertFalse(self.done_lock_before_blocking)
+
+    while True:
+      proc = jqueue._JobProcessor(self.queue, opexec, job,
+                                  _timeout_strategy_factory=tsf)
+
+      self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+      if self.curop is not None:
+        self.prev_status = self.job.ops[self.curop].status
+
+      self.lock_acq_prio = None
+
+      result = proc(_nextop_fn=self._NextOpcode)
+      assert self.curop is not None
+
+      if result or self.gave_lock:
+        # Got lock and/or job is done, result must've been written
+        self.assertFalse(job.cur_opctx)
+        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, self.queue.GetNextUpdate)
+        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
+        self.assert_(job.ops[self.curop].exec_timestamp)
+
+      if result:
+        self.assertFalse(job.cur_opctx)
+        break
+
+      self.assertFalse(result)
+
+      if self.curop == 0:
+        self.assertEqual(job.ops[self.curop].start_timestamp,
+                         job.start_timestamp)
+
+      if self.gave_lock:
+        # Opcode finished, but job not yet done
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      else:
+        # Did not get locks
+        self.assert_(job.cur_opctx)
+        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
+                         self.timeout_strategy.NextAttempt)
+        self.assertFalse(job.ops[self.curop].exec_timestamp)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+        # If priority has changed since acquiring locks, the job must've been
+        # updated
+        if self.lock_acq_prio != job.ops[self.curop].priority:
+          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+
+      self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+      self.assert_(job.start_timestamp)
+      self.assertFalse(job.end_timestamp)
+
+    self.assertEqual(self.curop, len(job.ops) - 1)
+    self.assertEqual(self.job, job)
+    self.assertEqual(self.opcounter.next(), len(job.ops))
+    self.assert_(self.done_lock_before_blocking)
+
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                            for op in job.ops))
+
+    # Finished jobs can't be processed any further
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(self.queue, opexec, job))
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()