--select-instances hbal manpage update
[ganeti-local] / test / ganeti.jqueue_unittest.py
index e6746ee..a0dd025 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2010 Google Inc.
+# Copyright (C) 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -27,11 +27,15 @@ import unittest
 import tempfile
 import shutil
 import errno
+import itertools
 
 from ganeti import constants
 from ganeti import utils
 from ganeti import errors
 from ganeti import jqueue
+from ganeti import opcodes
+from ganeti import compat
+from ganeti import mcpu
 
 import testutils
 
@@ -239,5 +243,1174 @@ class TestEncodeOpError(unittest.TestCase):
     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
 
 
+class TestQueuedOpCode(unittest.TestCase):
+  def testDefaults(self):
+    def _Check(op):
+      self.assertFalse(hasattr(op.input, "dry_run"))
+      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
+      self.assertFalse(op.log)
+      self.assert_(op.start_timestamp is None)
+      self.assert_(op.exec_timestamp is None)
+      self.assert_(op.end_timestamp is None)
+      self.assert_(op.result is None)
+      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
+
+    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
+    _Check(op1)
+    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
+    _Check(op2)
+    self.assertEqual(op1.Serialize(), op2.Serialize())
+
+  def testPriority(self):
+    def _Check(op):
+      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
+             "Default priority equals high priority; test can't work"
+      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
+      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
+
+    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
+    op1 = jqueue._QueuedOpCode(inpop)
+    _Check(op1)
+    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
+    _Check(op2)
+    self.assertEqual(op1.Serialize(), op2.Serialize())
+
+
+class TestQueuedJob(unittest.TestCase):
+  def test(self):
+    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
+                      None, 1, [])
+
+  def testDefaults(self):
+    job_id = 4260
+    ops = [
+      opcodes.OpTagsGet(),
+      opcodes.OpTestDelay(),
+      ]
+
+    def _Check(job):
+      self.assertEqual(job.id, job_id)
+      self.assertEqual(job.log_serial, 0)
+      self.assert_(job.received_timestamp)
+      self.assert_(job.start_timestamp is None)
+      self.assert_(job.end_timestamp is None)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+      self.assert_(repr(job).startswith("<"))
+      self.assertEqual(len(job.ops), len(ops))
+      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
+                              for (inp, op) in zip(ops, job.ops)))
+      self.assertRaises(errors.OpExecError, job.GetInfo,
+                        ["unknown-field"])
+      self.assertEqual(job.GetInfo(["summary"]),
+                       [[op.input.Summary() for op in job.ops]])
+
+    job1 = jqueue._QueuedJob(None, job_id, ops)
+    _Check(job1)
+    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
+    _Check(job2)
+    self.assertEqual(job1.Serialize(), job2.Serialize())
+
+  def testPriority(self):
+    job_id = 4283
+    ops = [
+      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
+      opcodes.OpTestDelay(),
+      ]
+
+    def _Check(job):
+      self.assertEqual(job.id, job_id)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assert_(repr(job).startswith("<"))
+
+    job = jqueue._QueuedJob(None, job_id, ops)
+    _Check(job)
+    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+                            for op in job.ops))
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+
+    # Increase first
+    job.ops[0].priority -= 1
+    _Check(job)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
+
+    # Mark opcode as finished
+    job.ops[0].status = constants.OP_STATUS_SUCCESS
+    _Check(job)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+
+    # Increase second
+    job.ops[1].priority -= 10
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
+
+    # Test increasing first
+    job.ops[0].status = constants.OP_STATUS_RUNNING
+    job.ops[0].priority -= 19
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
+
+  def testCalcStatus(self):
+    def _Queued(ops):
+      # The default status is "queued"
+      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
+                              for op in ops))
+
+    def _Waitlock1(ops):
+      ops[0].status = constants.OP_STATUS_WAITLOCK
+
+    def _Waitlock2(ops):
+      ops[0].status = constants.OP_STATUS_SUCCESS
+      ops[1].status = constants.OP_STATUS_SUCCESS
+      ops[2].status = constants.OP_STATUS_WAITLOCK
+
+    def _Running(ops):
+      ops[0].status = constants.OP_STATUS_SUCCESS
+      ops[1].status = constants.OP_STATUS_RUNNING
+      for op in ops[2:]:
+        op.status = constants.OP_STATUS_QUEUED
+
+    def _Canceling1(ops):
+      ops[0].status = constants.OP_STATUS_SUCCESS
+      ops[1].status = constants.OP_STATUS_SUCCESS
+      for op in ops[2:]:
+        op.status = constants.OP_STATUS_CANCELING
+
+    def _Canceling2(ops):
+      for op in ops:
+        op.status = constants.OP_STATUS_CANCELING
+
+    def _Canceled(ops):
+      for op in ops:
+        op.status = constants.OP_STATUS_CANCELED
+
+    def _Error1(ops):
+      for idx, op in enumerate(ops):
+        if idx > 3:
+          op.status = constants.OP_STATUS_ERROR
+        else:
+          op.status = constants.OP_STATUS_SUCCESS
+
+    def _Error2(ops):
+      for op in ops:
+        op.status = constants.OP_STATUS_ERROR
+
+    def _Success(ops):
+      for op in ops:
+        op.status = constants.OP_STATUS_SUCCESS
+
+    tests = {
+      constants.JOB_STATUS_QUEUED: [_Queued],
+      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
+      constants.JOB_STATUS_RUNNING: [_Running],
+      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
+      constants.JOB_STATUS_CANCELED: [_Canceled],
+      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
+      constants.JOB_STATUS_SUCCESS: [_Success],
+      }
+
+    def _NewJob():
+      job = jqueue._QueuedJob(None, 1,
+                              [opcodes.OpTestDelay() for _ in range(10)])
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
+                              for op in job.ops))
+      return job
+
+    for status in constants.JOB_STATUS_ALL:
+      sttests = tests[status]
+      assert sttests
+      for fn in sttests:
+        job = _NewJob()
+        fn(job.ops)
+        self.assertEqual(job.CalcStatus(), status)
+
+
+class _FakeQueueForProc:
+  def __init__(self):
+    self._acquired = False
+    self._updates = []
+    self._submitted = []
+
+    self._submit_count = itertools.count(1000)
+
+  def IsAcquired(self):
+    return self._acquired
+
+  def GetNextUpdate(self):
+    return self._updates.pop(0)
+
+  def GetNextSubmittedJob(self):
+    return self._submitted.pop(0)
+
+  def acquire(self, shared=0):
+    assert shared == 1
+    self._acquired = True
+
+  def release(self):
+    assert self._acquired
+    self._acquired = False
+
+  def UpdateJobUnlocked(self, job, replicate=True):
+    assert self._acquired, "Lock not acquired while updating job"
+    self._updates.append((job, bool(replicate)))
+
+  def SubmitManyJobs(self, jobs):
+    assert not self._acquired, "Lock acquired while submitting jobs"
+    job_ids = [self._submit_count.next() for _ in jobs]
+    self._submitted.extend(zip(job_ids, jobs))
+    return job_ids
+
+
+class _FakeExecOpCodeForProc:
+  def __init__(self, queue, before_start, after_start):
+    self._queue = queue
+    self._before_start = before_start
+    self._after_start = after_start
+
+  def __call__(self, op, cbs, timeout=None, priority=None):
+    assert isinstance(op, opcodes.OpTestDummy)
+    assert not self._queue.IsAcquired(), \
+           "Queue lock not released when executing opcode"
+
+    if self._before_start:
+      self._before_start(timeout, priority)
+
+    cbs.NotifyStart()
+
+    if self._after_start:
+      self._after_start(op, cbs)
+
+    # Check again after the callbacks
+    assert not self._queue.IsAcquired()
+
+    if op.fail:
+      raise errors.OpExecError("Error requested (%s)" % op.result)
+
+    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
+      return cbs.SubmitManyJobs(op.submit_jobs)
+
+    return op.result
+
+
+class _JobProcessorTestUtils:
+  def _CreateJob(self, queue, job_id, ops):
+    job = jqueue._QueuedJob(queue, job_id, ops)
+    self.assertFalse(job.start_timestamp)
+    self.assertFalse(job.end_timestamp)
+    self.assertEqual(len(ops), len(job.ops))
+    self.assert_(compat.all(op.input == inp
+                            for (op, inp) in zip(job.ops, ops)))
+    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
+    return job
+
+
+class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
+  def _GenericCheckJob(self, job):
+    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
+                      for op in job.ops)
+
+    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
+                     [[op.start_timestamp for op in job.ops],
+                      [op.exec_timestamp for op in job.ops],
+                      [op.end_timestamp for op in job.ops]])
+    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
+                     [job.received_timestamp,
+                      job.start_timestamp,
+                      job.end_timestamp])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
+
+  def testSuccess(self):
+    queue = _FakeQueueForProc()
+
+    for (job_id, opcount) in [(25351, 1), (6637, 3),
+                              (24644, 10), (32207, 100)]:
+      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+             for i in range(opcount)]
+
+      # Create job
+      job = self._CreateJob(queue, job_id, ops)
+
+      def _BeforeStart(timeout, priority):
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        self.assertFalse(queue.IsAcquired())
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertFalse(job.cur_opctx)
+
+      def _AfterStart(op, cbs):
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+
+        self.assertFalse(queue.IsAcquired())
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+        self.assertFalse(job.cur_opctx)
+
+        # Job is running, cancelling shouldn't be possible
+        (success, _) = job.Cancel()
+        self.assertFalse(success)
+
+      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+      for idx in range(len(ops)):
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        result = jqueue._JobProcessor(queue, opexec, job)()
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        if idx == len(ops) - 1:
+          # Last opcode
+          self.assert_(result)
+        else:
+          self.assertFalse(result)
+
+          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+          self.assert_(job.start_timestamp)
+          self.assertFalse(job.end_timestamp)
+
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+      self.assertEqual(job.GetInfo(["opresult"]),
+                       [[op.input.result for op in job.ops]])
+      self.assertEqual(job.GetInfo(["opstatus"]),
+                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                              for op in job.ops))
+
+      self._GenericCheckJob(job)
+
+      # Calling the processor on a finished job should be a no-op
+      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+  def testOpcodeError(self):
+    queue = _FakeQueueForProc()
+
+    testdata = [
+      (17077, 1, 0, 0),
+      (1782, 5, 2, 2),
+      (18179, 10, 9, 9),
+      (4744, 10, 3, 8),
+      (23816, 100, 39, 45),
+      ]
+
+    for (job_id, opcount, failfrom, failto) in testdata:
+      # Prepare opcodes
+      ops = [opcodes.OpTestDummy(result="Res%s" % i,
+                                 fail=(failfrom <= i and
+                                       i <= failto))
+             for i in range(opcount)]
+
+      # Create job
+      job = self._CreateJob(queue, job_id, ops)
+
+      opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+      for idx in range(len(ops)):
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        result = jqueue._JobProcessor(queue, opexec, job)()
+        # queued to waitlock
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        # waitlock to running
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        # Opcode result
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+
+        if idx in (failfrom, len(ops) - 1):
+          # Last opcode
+          self.assert_(result)
+          break
+
+        self.assertFalse(result)
+
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      # Check job status
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
+      self.assertEqual(job.GetInfo(["id"]), [job_id])
+      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
+
+      # Check opcode status
+      data = zip(job.ops,
+                 job.GetInfo(["opstatus"])[0],
+                 job.GetInfo(["opresult"])[0])
+
+      for idx, (op, opstatus, opresult) in enumerate(data):
+        if idx < failfrom:
+          assert not op.input.fail
+          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
+          self.assertEqual(opresult, op.input.result)
+        elif idx <= failto:
+          assert op.input.fail
+          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
+          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
+        else:
+          assert not op.input.fail
+          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
+          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
+
+      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                              for op in job.ops[:failfrom]))
+
+      self._GenericCheckJob(job)
+
+      # Calling the processor on a finished job should be a no-op
+      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+  def testCancelWhileInQueue(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 17045
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    # Mark as cancelled
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    self.assertFalse(job.start_timestamp)
+    self.assertTrue(job.end_timestamp)
+    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
+                            for op in job.ops))
+
+    # Serialize to check for differences
+    before_proc = job.Serialize()
+
+    # Simulate processor called in workerpool
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertFalse(job.start_timestamp)
+    self.assertTrue(job.end_timestamp)
+    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+    # Must not have changed or written
+    self.assertEqual(before_proc, job.Serialize())
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+  def testCancelWhileWaitlockInQueue(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 8645
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    job.ops[0].status = constants.OP_STATUS_WAITLOCK
+
+    assert len(job.ops) == 5
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    # Mark as cancelling
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                            for op in job.ops))
+
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertFalse(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlock(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 11009
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+      # Mark as cancelled
+      (success, _) = job.Cancel()
+      self.assert_(success)
+
+      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                              for op in job.ops))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlockWithTimeout(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 24314
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    def _BeforeStart(timeout, priority):
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+      # Mark as cancelled
+      (success, _) = job.Cancel()
+      self.assert_(success)
+
+      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                              for op in job.ops))
+
+      # Fake an acquire attempt timing out
+      raise mcpu.LockAcquireTimeout()
+
+    def _AfterStart(op, cbs):
+      self.fail("Should not reach this")
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileRunning(self):
+    # Tests canceling a job with finished opcodes and more, unprocessed ones
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(3)]
+
+    # Create job
+    job_id = 28492
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+    # Run one opcode
+    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Job goes back to queued
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_QUEUED,
+                       constants.OP_STATUS_QUEUED],
+                      ["Res0", None, None]])
+
+    # Mark as cancelled
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    # Try processing another opcode (this will actually cancel the job)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["id"]), [job_id])
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_CANCELED,
+                       constants.OP_STATUS_CANCELED],
+                      ["Res0", "Job canceled by request",
+                       "Job canceled by request"]])
+
+  def testPartiallyRun(self):
+    # Tests calling the processor on a job that's been partially run before the
+    # program was restarted
+    queue = _FakeQueueForProc()
+
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
+      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+             for i in range(10)]
+
+      # Create job
+      job = self._CreateJob(queue, job_id, ops)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+      for _ in range(successcount):
+        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assertEqual(job.GetInfo(["opstatus"]),
+                       [[constants.OP_STATUS_SUCCESS
+                         for _ in range(successcount)] +
+                        [constants.OP_STATUS_QUEUED
+                         for _ in range(len(ops) - successcount)]])
+
+      self.assert_(job.ops_iter)
+
+      # Serialize and restore (simulates program restart)
+      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+      self.assertFalse(newjob.ops_iter)
+      self._TestPartial(newjob, successcount)
+
+  def _TestPartial(self, job, successcount):
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
+
+    queue = _FakeQueueForProc()
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+    for remaining in reversed(range(len(job.ops) - successcount)):
+      result = jqueue._JobProcessor(queue, opexec, job)()
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      if remaining == 0:
+        # Last opcode
+        self.assert_(result)
+        break
+
+      self.assertFalse(result)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
+    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                            for op in job.ops))
+
+    self._GenericCheckJob(job)
+
+    # Calling the processor on a finished job should be a no-op
+    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    # ... also after being restored
+    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
+    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+  def testProcessorOnRunningJob(self):
+    ops = [opcodes.OpTestDummy(result="result", fail=False)]
+
+    queue = _FakeQueueForProc()
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+    # Create job
+    job = self._CreateJob(queue, 9571, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    job.ops[0].status = constants.OP_STATUS_RUNNING
+
+    assert len(job.ops) == 1
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    # Calling on running job must fail
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(queue, opexec, job))
+
+  def testLogMessages(self):
+    # Tests the "Feedback" callback function
+    queue = _FakeQueueForProc()
+
+    messages = {
+      1: [
+        (None, "Hello"),
+        (None, "World"),
+        (constants.ELOG_MESSAGE, "there"),
+        ],
+      4: [
+        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
+        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
+        ],
+      }
+    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
+                               messages=messages.get(i, []))
+           for i in range(5)]
+
+    # Create job
+    job = self._CreateJob(queue, 29386, ops)
+
+    def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+      self.assertRaises(AssertionError, cbs.Feedback,
+                        "too", "many", "arguments")
+
+      for (log_type, msg) in op.messages:
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+        if log_type:
+          cbs.Feedback(log_type, msg)
+        else:
+          cbs.Feedback(msg)
+        # Check for job update without replication
+        self.assertEqual(queue.GetNextUpdate(), (job, False))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    for remaining in reversed(range(len(job.ops))):
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      result = jqueue._JobProcessor(queue, opexec, job)()
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      if remaining == 0:
+        # Last opcode
+        self.assert_(result)
+        break
+
+      self.assertFalse(result)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+
+    logmsgcount = sum(len(m) for m in messages.values())
+
+    self._CheckLogMessages(job, logmsgcount)
+
+    # Serialize and restore (simulates program restart)
+    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+    self._CheckLogMessages(newjob, logmsgcount)
+
+    # Check each message
+    prevserial = -1
+    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
+      for (serial, timestamp, log_type, msg) in oplog:
+        (exptype, expmsg) = messages.get(idx).pop(0)
+        if exptype:
+          self.assertEqual(log_type, exptype)
+        else:
+          self.assertEqual(log_type, constants.ELOG_MESSAGE)
+        self.assertEqual(expmsg, msg)
+        self.assert_(serial > prevserial)
+        prevserial = serial
+
+  def _CheckLogMessages(self, job, count):
+    # Check serial
+    self.assertEqual(job.log_serial, count)
+
+    # No filter
+    self.assertEqual(job.GetLogEntries(None),
+                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
+                      for entry in entries])
+
+    # Filter with serial
+    assert count > 3
+    self.assert_(job.GetLogEntries(3))
+    self.assertEqual(job.GetLogEntries(3),
+                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
+                      for entry in entries][3:])
+
+    # No log message after highest serial
+    self.assertFalse(job.GetLogEntries(count))
+    self.assertFalse(job.GetLogEntries(count + 3))
+
+  def testSubmitManyJobs(self):
+    queue = _FakeQueueForProc()
+
+    job_id = 15656
+    ops = [
+      opcodes.OpTestDummy(result="Res0", fail=False,
+                          submit_jobs=[]),
+      opcodes.OpTestDummy(result="Res1", fail=False,
+                          submit_jobs=[
+                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
+                            ]),
+      opcodes.OpTestDummy(result="Res2", fail=False,
+                          submit_jobs=[
+                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
+                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
+                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
+                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
+                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
+                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
+                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
+                            ]),
+      ]
+
+    # Create job
+    job = self._CreateJob(queue, job_id, ops)
+
+    def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertFalse(job.cur_opctx)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+      self.assertFalse(job.cur_opctx)
+
+      # Job is running, cancelling shouldn't be possible
+      (success, _) = job.Cancel()
+      self.assertFalse(success)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    for idx in range(len(ops)):
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      result = jqueue._JobProcessor(queue, opexec, job)()
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      if idx == len(ops) - 1:
+        # Last opcode
+        self.assert_(result)
+      else:
+        self.assertFalse(result)
+
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+        self.assert_(job.start_timestamp)
+        self.assertFalse(job.end_timestamp)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    for idx, submitted_ops in enumerate(job_ops
+                                        for op in ops
+                                        for job_ops in op.submit_jobs):
+      self.assertEqual(queue.GetNextSubmittedJob(),
+                       (1000 + idx, submitted_ops))
+    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[[], [1000], [1001, 1002, 1003]]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+
+    self._GenericCheckJob(job)
+
+    # Calling the processor on a finished job should be a no-op
+    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+
+class _FakeTimeoutStrategy:
+  def __init__(self, timeouts):
+    self.timeouts = timeouts
+    self.attempts = 0
+    self.last_timeout = None
+
+  def NextAttempt(self):
+    self.attempts += 1
+    if self.timeouts:
+      timeout = self.timeouts.pop(0)
+    else:
+      timeout = None
+    self.last_timeout = timeout
+    return timeout
+
+
+class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
+  def setUp(self):
+    self.queue = _FakeQueueForProc()
+    self.job = None
+    self.curop = None
+    self.opcounter = None
+    self.timeout_strategy = None
+    self.retries = 0
+    self.prev_tsop = None
+    self.prev_prio = None
+    self.prev_status = None
+    self.lock_acq_prio = None
+    self.gave_lock = None
+    self.done_lock_before_blocking = False
+
+  def _BeforeStart(self, timeout, priority):
+    job = self.job
+
+    # If status has changed, job must've been written
+    if self.prev_status != self.job.ops[self.curop].status:
+      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+    self.assertFalse(self.queue.IsAcquired())
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    ts = self.timeout_strategy
+
+    self.assert_(timeout is None or isinstance(timeout, (int, float)))
+    self.assertEqual(timeout, ts.last_timeout)
+    self.assertEqual(priority, job.ops[self.curop].priority)
+
+    self.gave_lock = True
+    self.lock_acq_prio = priority
+
+    if (self.curop == 3 and
+        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
+      # Give locks before running into blocking acquire
+      assert self.retries == 7
+      self.retries = 0
+      self.done_lock_before_blocking = True
+      return
+
+    if self.retries > 0:
+      self.assert_(timeout is not None)
+      self.retries -= 1
+      self.gave_lock = False
+      raise mcpu.LockAcquireTimeout()
+
+    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
+      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
+      assert not ts.timeouts
+      self.assert_(timeout is None)
+
+  def _AfterStart(self, op, cbs):
+    job = self.job
+
+    # Setting to "running" requires an update
+    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+    self.assertFalse(self.queue.IsAcquired())
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    # Job is running, cancelling shouldn't be possible
+    (success, _) = job.Cancel()
+    self.assertFalse(success)
+
+  def _NextOpcode(self):
+    self.curop = self.opcounter.next()
+    self.prev_prio = self.job.ops[self.curop].priority
+    self.prev_status = self.job.ops[self.curop].status
+
+  def _NewTimeoutStrategy(self):
+    job = self.job
+
+    self.assertEqual(self.retries, 0)
+
+    if self.prev_tsop == self.curop:
+      # Still on the same opcode, priority must've been increased
+      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
+
+    if self.curop == 1:
+      # Normal retry
+      timeouts = range(10, 31, 10)
+      self.retries = len(timeouts) - 1
+
+    elif self.curop == 2:
+      # Let this run into a blocking acquire
+      timeouts = range(11, 61, 12)
+      self.retries = len(timeouts)
+
+    elif self.curop == 3:
+      # Wait for priority to increase, but give lock before blocking acquire
+      timeouts = range(12, 100, 14)
+      self.retries = len(timeouts)
+
+      self.assertFalse(self.done_lock_before_blocking)
+
+    elif self.curop == 4:
+      self.assert_(self.done_lock_before_blocking)
+
+      # Timeouts, but no need to retry
+      timeouts = range(10, 31, 10)
+      self.retries = 0
+
+    elif self.curop == 5:
+      # Normal retry
+      timeouts = range(19, 100, 11)
+      self.retries = len(timeouts)
+
+    else:
+      timeouts = []
+      self.retries = 0
+
+    assert len(job.ops) == 10
+    assert self.retries <= len(timeouts)
+
+    ts = _FakeTimeoutStrategy(timeouts)
+
+    self.timeout_strategy = ts
+    self.prev_tsop = self.curop
+    self.prev_prio = job.ops[self.curop].priority
+
+    return ts
+
+  def testTimeout(self):
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(10)]
+
+    # Create job
+    job_id = 15801
+    job = self._CreateJob(self.queue, job_id, ops)
+    self.job = job
+
+    self.opcounter = itertools.count(0)
+
+    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
+                                    self._AfterStart)
+    tsf = self._NewTimeoutStrategy
+
+    self.assertFalse(self.done_lock_before_blocking)
+
+    while True:
+      proc = jqueue._JobProcessor(self.queue, opexec, job,
+                                  _timeout_strategy_factory=tsf)
+
+      self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+      if self.curop is not None:
+        self.prev_status = self.job.ops[self.curop].status
+
+      self.lock_acq_prio = None
+
+      result = proc(_nextop_fn=self._NextOpcode)
+      assert self.curop is not None
+
+      if result or self.gave_lock:
+        # Got lock and/or job is done, result must've been written
+        self.assertFalse(job.cur_opctx)
+        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, self.queue.GetNextUpdate)
+        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
+        self.assert_(job.ops[self.curop].exec_timestamp)
+
+      if result:
+        self.assertFalse(job.cur_opctx)
+        break
+
+      self.assertFalse(result)
+
+      if self.curop == 0:
+        self.assertEqual(job.ops[self.curop].start_timestamp,
+                         job.start_timestamp)
+
+      if self.gave_lock:
+        # Opcode finished, but job not yet done
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      else:
+        # Did not get locks
+        self.assert_(job.cur_opctx)
+        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
+                         self.timeout_strategy.NextAttempt)
+        self.assertFalse(job.ops[self.curop].exec_timestamp)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+        # If priority has changed since acquiring locks, the job must've been
+        # updated
+        if self.lock_acq_prio != job.ops[self.curop].priority:
+          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+
+      self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+      self.assert_(job.start_timestamp)
+      self.assertFalse(job.end_timestamp)
+
+    self.assertEqual(self.curop, len(job.ops) - 1)
+    self.assertEqual(self.job, job)
+    self.assertEqual(self.opcounter.next(), len(job.ops))
+    self.assert_(self.done_lock_before_blocking)
+
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                            for op in job.ops))
+
+    # Calling the processor on a finished job should be a no-op
+    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()