Rename OpGetTags and LUGetTags
[ganeti-local] / test / ganeti.jqueue_unittest.py
index ab6ce22..9137927 100755 (executable)
@@ -27,6 +27,7 @@ import unittest
 import tempfile
 import shutil
 import errno
+import itertools
 
 from ganeti import constants
 from ganeti import utils
@@ -34,6 +35,7 @@ from ganeti import errors
 from ganeti import jqueue
 from ganeti import opcodes
 from ganeti import compat
+from ganeti import mcpu
 
 import testutils
 
@@ -266,7 +268,7 @@ class TestQueuedOpCode(unittest.TestCase):
       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
 
-    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
+    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
     op1 = jqueue._QueuedOpCode(inpop)
     _Check(op1)
     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
@@ -275,10 +277,14 @@ class TestQueuedOpCode(unittest.TestCase):
 
 
 class TestQueuedJob(unittest.TestCase):
+  def test(self):
+    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
+                      None, 1, [])
+
   def testDefaults(self):
     job_id = 4260
     ops = [
-      opcodes.OpGetTags(),
+      opcodes.OpTagsGet(),
       opcodes.OpTestDelay(),
       ]
 
@@ -294,6 +300,10 @@ class TestQueuedJob(unittest.TestCase):
       self.assertEqual(len(job.ops), len(ops))
       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
                               for (inp, op) in zip(ops, job.ops)))
+      self.assertRaises(errors.OpExecError, job.GetInfo,
+                        ["unknown-field"])
+      self.assertEqual(job.GetInfo(["summary"]),
+                       [[op.input.Summary() for op in job.ops]])
 
     job1 = jqueue._QueuedJob(None, job_id, ops)
     _Check(job1)
@@ -304,7 +314,7 @@ class TestQueuedJob(unittest.TestCase):
   def testPriority(self):
     job_id = 4283
     ops = [
-      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
+      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
       opcodes.OpTestDelay(),
       ]
 
@@ -338,6 +348,955 @@ class TestQueuedJob(unittest.TestCase):
     job.ops[0].priority -= 19
     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
 
+  def testCalcStatus(self):
+    def _Queued(ops):
+      # The default status is "queued"
+      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
+                              for op in ops))
+
+    def _Waitlock1(ops):
+      ops[0].status = constants.OP_STATUS_WAITLOCK
+
+    def _Waitlock2(ops):
+      ops[0].status = constants.OP_STATUS_SUCCESS
+      ops[1].status = constants.OP_STATUS_SUCCESS
+      ops[2].status = constants.OP_STATUS_WAITLOCK
+
+    def _Running(ops):
+      ops[0].status = constants.OP_STATUS_SUCCESS
+      ops[1].status = constants.OP_STATUS_RUNNING
+      for op in ops[2:]:
+        op.status = constants.OP_STATUS_QUEUED
+
+    def _Canceling1(ops):
+      ops[0].status = constants.OP_STATUS_SUCCESS
+      ops[1].status = constants.OP_STATUS_SUCCESS
+      for op in ops[2:]:
+        op.status = constants.OP_STATUS_CANCELING
+
+    def _Canceling2(ops):
+      for op in ops:
+        op.status = constants.OP_STATUS_CANCELING
+
+    def _Canceled(ops):
+      for op in ops:
+        op.status = constants.OP_STATUS_CANCELED
+
+    def _Error1(ops):
+      for idx, op in enumerate(ops):
+        if idx > 3:
+          op.status = constants.OP_STATUS_ERROR
+        else:
+          op.status = constants.OP_STATUS_SUCCESS
+
+    def _Error2(ops):
+      for op in ops:
+        op.status = constants.OP_STATUS_ERROR
+
+    def _Success(ops):
+      for op in ops:
+        op.status = constants.OP_STATUS_SUCCESS
+
+    tests = {
+      constants.JOB_STATUS_QUEUED: [_Queued],
+      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
+      constants.JOB_STATUS_RUNNING: [_Running],
+      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
+      constants.JOB_STATUS_CANCELED: [_Canceled],
+      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
+      constants.JOB_STATUS_SUCCESS: [_Success],
+      }
+
+    def _NewJob():
+      job = jqueue._QueuedJob(None, 1,
+                              [opcodes.OpTestDelay() for _ in range(10)])
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
+                              for op in job.ops))
+      return job
+
+    for status in constants.JOB_STATUS_ALL:
+      sttests = tests[status]
+      assert sttests
+      for fn in sttests:
+        job = _NewJob()
+        fn(job.ops)
+        self.assertEqual(job.CalcStatus(), status)
+
+
+class _FakeQueueForProc:
+  def __init__(self):
+    self._acquired = False
+    self._updates = []
+
+  def IsAcquired(self):
+    return self._acquired
+
+  def GetNextUpdate(self):
+    return self._updates.pop(0)
+
+  def acquire(self, shared=0):
+    assert shared == 1
+    self._acquired = True
+
+  def release(self):
+    assert self._acquired
+    self._acquired = False
+
+  def UpdateJobUnlocked(self, job, replicate=True):
+    assert self._acquired, "Lock not acquired while updating job"
+    self._updates.append((job, bool(replicate)))
+
+
+class _FakeExecOpCodeForProc:
+  def __init__(self, queue, before_start, after_start):
+    self._queue = queue
+    self._before_start = before_start
+    self._after_start = after_start
+
+  def __call__(self, op, cbs, timeout=None, priority=None):
+    assert isinstance(op, opcodes.OpTestDummy)
+    assert not self._queue.IsAcquired(), \
+           "Queue lock not released when executing opcode"
+
+    if self._before_start:
+      self._before_start(timeout, priority)
+
+    cbs.NotifyStart()
+
+    if self._after_start:
+      self._after_start(op, cbs)
+
+    # Check again after the callbacks
+    assert not self._queue.IsAcquired()
+
+    if op.fail:
+      raise errors.OpExecError("Error requested (%s)" % op.result)
+
+    return op.result
+
+
+class _JobProcessorTestUtils:
+  def _CreateJob(self, queue, job_id, ops):
+    job = jqueue._QueuedJob(queue, job_id, ops)
+    self.assertFalse(job.start_timestamp)
+    self.assertFalse(job.end_timestamp)
+    self.assertEqual(len(ops), len(job.ops))
+    self.assert_(compat.all(op.input == inp
+                            for (op, inp) in zip(job.ops, ops)))
+    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
+    return job
+
+
+class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
+  def _GenericCheckJob(self, job):
+    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
+                      for op in job.ops)
+
+    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
+                     [[op.start_timestamp for op in job.ops],
+                      [op.exec_timestamp for op in job.ops],
+                      [op.end_timestamp for op in job.ops]])
+    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
+                     [job.received_timestamp,
+                      job.start_timestamp,
+                      job.end_timestamp])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
+
+  def testSuccess(self):
+    queue = _FakeQueueForProc()
+
+    for (job_id, opcount) in [(25351, 1), (6637, 3),
+                              (24644, 10), (32207, 100)]:
+      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+             for i in range(opcount)]
+
+      # Create job
+      job = self._CreateJob(queue, job_id, ops)
+
+      def _BeforeStart(timeout, priority):
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        self.assertFalse(queue.IsAcquired())
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertFalse(job.cur_opctx)
+
+      def _AfterStart(op, cbs):
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+
+        self.assertFalse(queue.IsAcquired())
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+        self.assertFalse(job.cur_opctx)
+
+        # Job is running, cancelling shouldn't be possible
+        (success, _) = job.Cancel()
+        self.assertFalse(success)
+
+      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+      for idx in range(len(ops)):
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        result = jqueue._JobProcessor(queue, opexec, job)()
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        if idx == len(ops) - 1:
+          # Last opcode
+          self.assert_(result)
+        else:
+          self.assertFalse(result)
+
+          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+          self.assert_(job.start_timestamp)
+          self.assertFalse(job.end_timestamp)
+
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+      self.assertEqual(job.GetInfo(["opresult"]),
+                       [[op.input.result for op in job.ops]])
+      self.assertEqual(job.GetInfo(["opstatus"]),
+                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                              for op in job.ops))
+
+      self._GenericCheckJob(job)
+
+      # Finished jobs can't be processed any further
+      self.assertRaises(errors.ProgrammerError,
+                        jqueue._JobProcessor(queue, opexec, job))
+
+  def testOpcodeError(self):
+    queue = _FakeQueueForProc()
+
+    testdata = [
+      (17077, 1, 0, 0),
+      (1782, 5, 2, 2),
+      (18179, 10, 9, 9),
+      (4744, 10, 3, 8),
+      (23816, 100, 39, 45),
+      ]
+
+    for (job_id, opcount, failfrom, failto) in testdata:
+      # Prepare opcodes
+      ops = [opcodes.OpTestDummy(result="Res%s" % i,
+                                 fail=(failfrom <= i and
+                                       i <= failto))
+             for i in range(opcount)]
+
+      # Create job
+      job = self._CreateJob(queue, job_id, ops)
+
+      opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+      for idx in range(len(ops)):
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        result = jqueue._JobProcessor(queue, opexec, job)()
+        # queued to waitlock
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        # waitlock to running
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        # Opcode result
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+
+        if idx in (failfrom, len(ops) - 1):
+          # Last opcode
+          self.assert_(result)
+          break
+
+        self.assertFalse(result)
+
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      # Check job status
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
+      self.assertEqual(job.GetInfo(["id"]), [job_id])
+      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
+
+      # Check opcode status
+      data = zip(job.ops,
+                 job.GetInfo(["opstatus"])[0],
+                 job.GetInfo(["opresult"])[0])
+
+      for idx, (op, opstatus, opresult) in enumerate(data):
+        if idx < failfrom:
+          assert not op.input.fail
+          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
+          self.assertEqual(opresult, op.input.result)
+        elif idx <= failto:
+          assert op.input.fail
+          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
+          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
+        else:
+          assert not op.input.fail
+          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
+          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
+
+      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                              for op in job.ops[:failfrom]))
+
+      self._GenericCheckJob(job)
+
+      # Finished jobs can't be processed any further
+      self.assertRaises(errors.ProgrammerError,
+                        jqueue._JobProcessor(queue, opexec, job))
+
+  def testCancelWhileInQueue(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 17045
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    # Mark as cancelled
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
+                            for op in job.ops))
+
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertFalse(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlockInQueue(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 8645
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    job.ops[0].status = constants.OP_STATUS_WAITLOCK
+
+    assert len(job.ops) == 5
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    # Mark as cancelling
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                            for op in job.ops))
+
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertFalse(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlock(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 11009
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+      # Mark as cancelled
+      (success, _) = job.Cancel()
+      self.assert_(success)
+
+      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                              for op in job.ops))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlockWithTimeout(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 24314
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    def _BeforeStart(timeout, priority):
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+      # Mark as cancelled
+      (success, _) = job.Cancel()
+      self.assert_(success)
+
+      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                              for op in job.ops))
+
+      # Fake an acquire attempt timing out
+      raise mcpu.LockAcquireTimeout()
+
+    def _AfterStart(op, cbs):
+      self.fail("Should not reach this")
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileRunning(self):
+    # Tests canceling a job with finished opcodes and more, unprocessed ones
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(3)]
+
+    # Create job
+    job_id = 28492
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+    # Run one opcode
+    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Job goes back to queued
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_QUEUED,
+                       constants.OP_STATUS_QUEUED],
+                      ["Res0", None, None]])
+
+    # Mark as cancelled
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    # Try processing another opcode (this will actually cancel the job)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["id"]), [job_id])
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_CANCELED,
+                       constants.OP_STATUS_CANCELED],
+                      ["Res0", "Job canceled by request",
+                       "Job canceled by request"]])
+
+  def testPartiallyRun(self):
+    # Tests calling the processor on a job that's been partially run before the
+    # program was restarted
+    queue = _FakeQueueForProc()
+
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
+      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+             for i in range(10)]
+
+      # Create job
+      job = self._CreateJob(queue, job_id, ops)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+      for _ in range(successcount):
+        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assertEqual(job.GetInfo(["opstatus"]),
+                       [[constants.OP_STATUS_SUCCESS
+                         for _ in range(successcount)] +
+                        [constants.OP_STATUS_QUEUED
+                         for _ in range(len(ops) - successcount)]])
+
+      self.assert_(job.ops_iter)
+
+      # Serialize and restore (simulates program restart)
+      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+      self.assertFalse(newjob.ops_iter)
+      self._TestPartial(newjob, successcount)
+
+  def _TestPartial(self, job, successcount):
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
+
+    queue = _FakeQueueForProc()
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+    for remaining in reversed(range(len(job.ops) - successcount)):
+      result = jqueue._JobProcessor(queue, opexec, job)()
+
+      if remaining == 0:
+        # Last opcode
+        self.assert_(result)
+        break
+
+      self.assertFalse(result)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
+    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                            for op in job.ops))
+
+    self._GenericCheckJob(job)
+
+    # Finished jobs can't be processed any further
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(queue, opexec, job))
+
+    # ... also after being restored
+    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(queue, opexec, job2))
+
+  def testProcessorOnRunningJob(self):
+    ops = [opcodes.OpTestDummy(result="result", fail=False)]
+
+    queue = _FakeQueueForProc()
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+    # Create job
+    job = self._CreateJob(queue, 9571, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    job.ops[0].status = constants.OP_STATUS_RUNNING
+
+    assert len(job.ops) == 1
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    # Calling on running job must fail
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(queue, opexec, job))
+
+  def testLogMessages(self):
+    # Tests the "Feedback" callback function
+    queue = _FakeQueueForProc()
+
+    messages = {
+      1: [
+        (None, "Hello"),
+        (None, "World"),
+        (constants.ELOG_MESSAGE, "there"),
+        ],
+      4: [
+        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
+        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
+        ],
+      }
+    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
+                               messages=messages.get(i, []))
+           for i in range(5)]
+
+    # Create job
+    job = self._CreateJob(queue, 29386, ops)
+
+    def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+      self.assertRaises(AssertionError, cbs.Feedback,
+                        "too", "many", "arguments")
+
+      for (log_type, msg) in op.messages:
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        if log_type:
+          cbs.Feedback(log_type, msg)
+        else:
+          cbs.Feedback(msg)
+        # Check for job update without replication
+        self.assertEqual(queue.GetNextUpdate(), (job, False))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    for remaining in reversed(range(len(job.ops))):
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      result = jqueue._JobProcessor(queue, opexec, job)()
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      if remaining == 0:
+        # Last opcode
+        self.assert_(result)
+        break
+
+      self.assertFalse(result)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+
+    logmsgcount = sum(len(m) for m in messages.values())
+
+    self._CheckLogMessages(job, logmsgcount)
+
+    # Serialize and restore (simulates program restart)
+    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+    self._CheckLogMessages(newjob, logmsgcount)
+
+    # Check each message
+    prevserial = -1
+    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
+      for (serial, timestamp, log_type, msg) in oplog:
+        (exptype, expmsg) = messages.get(idx).pop(0)
+        if exptype:
+          self.assertEqual(log_type, exptype)
+        else:
+          self.assertEqual(log_type, constants.ELOG_MESSAGE)
+        self.assertEqual(expmsg, msg)
+        self.assert_(serial > prevserial)
+        prevserial = serial
+
+  def _CheckLogMessages(self, job, count):
+    # Check serial
+    self.assertEqual(job.log_serial, count)
+
+    # No filter
+    self.assertEqual(job.GetLogEntries(None),
+                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
+                      for entry in entries])
+
+    # Filter with serial
+    assert count > 3
+    self.assert_(job.GetLogEntries(3))
+    self.assertEqual(job.GetLogEntries(3),
+                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
+                      for entry in entries][3:])
+
+    # No log message after highest serial
+    self.assertFalse(job.GetLogEntries(count))
+    self.assertFalse(job.GetLogEntries(count + 3))
+
+
+class _FakeTimeoutStrategy:
+  def __init__(self, timeouts):
+    self.timeouts = timeouts
+    self.attempts = 0
+    self.last_timeout = None
+
+  def NextAttempt(self):
+    self.attempts += 1
+    if self.timeouts:
+      timeout = self.timeouts.pop(0)
+    else:
+      timeout = None
+    self.last_timeout = timeout
+    return timeout
+
+
+class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
+  def setUp(self):
+    self.queue = _FakeQueueForProc()
+    self.job = None
+    self.curop = None
+    self.opcounter = None
+    self.timeout_strategy = None
+    self.retries = 0
+    self.prev_tsop = None
+    self.prev_prio = None
+    self.prev_status = None
+    self.lock_acq_prio = None
+    self.gave_lock = None
+    self.done_lock_before_blocking = False
+
+  def _BeforeStart(self, timeout, priority):
+    job = self.job
+
+    # If status has changed, job must've been written
+    if self.prev_status != self.job.ops[self.curop].status:
+      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+    self.assertFalse(self.queue.IsAcquired())
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    ts = self.timeout_strategy
+
+    self.assert_(timeout is None or isinstance(timeout, (int, float)))
+    self.assertEqual(timeout, ts.last_timeout)
+    self.assertEqual(priority, job.ops[self.curop].priority)
+
+    self.gave_lock = True
+    self.lock_acq_prio = priority
+
+    if (self.curop == 3 and
+        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
+      # Give locks before running into blocking acquire
+      assert self.retries == 7
+      self.retries = 0
+      self.done_lock_before_blocking = True
+      return
+
+    if self.retries > 0:
+      self.assert_(timeout is not None)
+      self.retries -= 1
+      self.gave_lock = False
+      raise mcpu.LockAcquireTimeout()
+
+    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
+      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
+      assert not ts.timeouts
+      self.assert_(timeout is None)
+
+  def _AfterStart(self, op, cbs):
+    job = self.job
+
+    # Setting to "running" requires an update
+    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+    self.assertFalse(self.queue.IsAcquired())
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    # Job is running, cancelling shouldn't be possible
+    (success, _) = job.Cancel()
+    self.assertFalse(success)
+
+  def _NextOpcode(self):
+    self.curop = self.opcounter.next()
+    self.prev_prio = self.job.ops[self.curop].priority
+    self.prev_status = self.job.ops[self.curop].status
+
+  def _NewTimeoutStrategy(self):
+    job = self.job
+
+    self.assertEqual(self.retries, 0)
+
+    if self.prev_tsop == self.curop:
+      # Still on the same opcode, priority must've been increased
+      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
+
+    if self.curop == 1:
+      # Normal retry
+      timeouts = range(10, 31, 10)
+      self.retries = len(timeouts) - 1
+
+    elif self.curop == 2:
+      # Let this run into a blocking acquire
+      timeouts = range(11, 61, 12)
+      self.retries = len(timeouts)
+
+    elif self.curop == 3:
+      # Wait for priority to increase, but give lock before blocking acquire
+      timeouts = range(12, 100, 14)
+      self.retries = len(timeouts)
+
+      self.assertFalse(self.done_lock_before_blocking)
+
+    elif self.curop == 4:
+      self.assert_(self.done_lock_before_blocking)
+
+      # Timeouts, but no need to retry
+      timeouts = range(10, 31, 10)
+      self.retries = 0
+
+    elif self.curop == 5:
+      # Normal retry
+      timeouts = range(19, 100, 11)
+      self.retries = len(timeouts)
+
+    else:
+      timeouts = []
+      self.retries = 0
+
+    assert len(job.ops) == 10
+    assert self.retries <= len(timeouts)
+
+    ts = _FakeTimeoutStrategy(timeouts)
+
+    self.timeout_strategy = ts
+    self.prev_tsop = self.curop
+    self.prev_prio = job.ops[self.curop].priority
+
+    return ts
+
+  def testTimeout(self):
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(10)]
+
+    # Create job
+    job_id = 15801
+    job = self._CreateJob(self.queue, job_id, ops)
+    self.job = job
+
+    self.opcounter = itertools.count(0)
+
+    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
+                                    self._AfterStart)
+    tsf = self._NewTimeoutStrategy
+
+    self.assertFalse(self.done_lock_before_blocking)
+
+    while True:
+      proc = jqueue._JobProcessor(self.queue, opexec, job,
+                                  _timeout_strategy_factory=tsf)
+
+      self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+      if self.curop is not None:
+        self.prev_status = self.job.ops[self.curop].status
+
+      self.lock_acq_prio = None
+
+      result = proc(_nextop_fn=self._NextOpcode)
+      assert self.curop is not None
+
+      if result or self.gave_lock:
+        # Got lock and/or job is done, result must've been written
+        self.assertFalse(job.cur_opctx)
+        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, self.queue.GetNextUpdate)
+        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
+        self.assert_(job.ops[self.curop].exec_timestamp)
+
+      if result:
+        self.assertFalse(job.cur_opctx)
+        break
+
+      self.assertFalse(result)
+
+      if self.curop == 0:
+        self.assertEqual(job.ops[self.curop].start_timestamp,
+                         job.start_timestamp)
+
+      if self.gave_lock:
+        # Opcode finished, but job not yet done
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      else:
+        # Did not get locks
+        self.assert_(job.cur_opctx)
+        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
+                         self.timeout_strategy.NextAttempt)
+        self.assertFalse(job.ops[self.curop].exec_timestamp)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+        # If priority has changed since acquiring locks, the job must've been
+        # updated
+        if self.lock_acq_prio != job.ops[self.curop].priority:
+          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+
+      self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+      self.assert_(job.start_timestamp)
+      self.assertFalse(job.end_timestamp)
+
+    self.assertEqual(self.curop, len(job.ops) - 1)
+    self.assertEqual(self.job, job)
+    self.assertEqual(self.opcounter.next(), len(job.ops))
+    self.assert_(self.done_lock_before_blocking)
+
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                            for op in job.ops))
+
+    # Finished jobs can't be processed any further
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(self.queue, opexec, job))
+
 
 if __name__ == "__main__":
   testutils.GanetiTestProgram()