X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/1e6d57506fd81b1caf14d7d7911f08ac1777e8e5..545d036273bb778751ac6de64a8c5dca9f7cb5cf:/test/ganeti.jqueue_unittest.py diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index a0dd025..f2ffc8a 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -28,6 +28,7 @@ import tempfile import shutil import errno import itertools +import random from ganeti import constants from ganeti import utils @@ -36,6 +37,7 @@ from ganeti import jqueue from ganeti import opcodes from ganeti import compat from ganeti import mcpu +from ganeti import query import testutils @@ -43,6 +45,7 @@ import testutils class _FakeJob: def __init__(self, job_id, status): self.id = job_id + self.writable = False self._status = status self._log = [] @@ -200,7 +203,7 @@ class TestWaitForJobChangesHelper(unittest.TestCase): shutil.rmtree(self.tmpdir) def _LoadWaitingJob(self): - return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK) + return _FakeJob(2614226563, constants.JOB_STATUS_WAITING) def _LoadLostJob(self): return None @@ -210,13 +213,13 @@ class TestWaitForJobChangesHelper(unittest.TestCase): # No change self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"], - [constants.JOB_STATUS_WAITLOCK], None, 0.1), + [constants.JOB_STATUS_WAITING], None, 0.1), constants.JOB_NOTCHANGED) # No previous information self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"], None, None, 1.0), - ([constants.JOB_STATUS_WAITLOCK], [])) + ([constants.JOB_STATUS_WAITING], [])) def testLostJob(self): wfjc = jqueue._WaitForJobChangesHelper() @@ -279,7 +282,7 @@ class TestQueuedOpCode(unittest.TestCase): class TestQueuedJob(unittest.TestCase): def test(self): self.assertRaises(errors.GenericError, jqueue._QueuedJob, - None, 1, []) + None, 1, [], False) def testDefaults(self): job_id = 4260 @@ -289,6 +292,7 @@ class TestQueuedJob(unittest.TestCase): ] def _Check(job): + self.assertTrue(job.writable) self.assertEqual(job.id, job_id) self.assertEqual(job.log_serial, 0) self.assert_(job.received_timestamp) @@ -305,12 +309,19 @@ class TestQueuedJob(unittest.TestCase): self.assertEqual(job.GetInfo(["summary"]), [[op.input.Summary() for op in job.ops]]) - job1 = jqueue._QueuedJob(None, job_id, ops) + job1 = jqueue._QueuedJob(None, job_id, ops, True) _Check(job1) - job2 = jqueue._QueuedJob.Restore(None, job1.Serialize()) + job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True) _Check(job2) self.assertEqual(job1.Serialize(), job2.Serialize()) + def testWritable(self): + job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False) + self.assertFalse(job.writable) + + job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True) + self.assertTrue(job.writable) + def testPriority(self): job_id = 4283 ops = [ @@ -323,7 +334,7 @@ class TestQueuedJob(unittest.TestCase): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) self.assert_(repr(job).startswith("<")) - job = jqueue._QueuedJob(None, job_id, ops) + job = jqueue._QueuedJob(None, job_id, ops, True) _Check(job) self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT for op in job.ops)) @@ -355,12 +366,12 @@ class TestQueuedJob(unittest.TestCase): for op in ops)) def _Waitlock1(ops): - ops[0].status = constants.OP_STATUS_WAITLOCK + ops[0].status = constants.OP_STATUS_WAITING def _Waitlock2(ops): ops[0].status = constants.OP_STATUS_SUCCESS ops[1].status = constants.OP_STATUS_SUCCESS - ops[2].status = constants.OP_STATUS_WAITLOCK + ops[2].status = constants.OP_STATUS_WAITING def _Running(ops): ops[0].status = constants.OP_STATUS_SUCCESS @@ -399,7 +410,7 @@ class TestQueuedJob(unittest.TestCase): tests = { constants.JOB_STATUS_QUEUED: [_Queued], - constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2], + constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2], constants.JOB_STATUS_RUNNING: [_Running], constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2], constants.JOB_STATUS_CANCELED: [_Canceled], @@ -409,7 +420,8 @@ class TestQueuedJob(unittest.TestCase): def _NewJob(): job = jqueue._QueuedJob(None, 1, - [opcodes.OpTestDelay() for _ in range(10)]) + [opcodes.OpTestDelay() for _ in range(10)], + True) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED for op in job.ops)) @@ -424,14 +436,71 @@ class TestQueuedJob(unittest.TestCase): self.assertEqual(job.CalcStatus(), status) -class _FakeQueueForProc: +class _FakeDependencyManager: def __init__(self): + self._checks = [] + self._notifications = [] + self._waiting = set() + + def AddCheckResult(self, job, dep_job_id, dep_status, result): + self._checks.append((job, dep_job_id, dep_status, result)) + + def CountPendingResults(self): + return len(self._checks) + + def CountWaitingJobs(self): + return len(self._waiting) + + def GetNextNotification(self): + return self._notifications.pop(0) + + def JobWaiting(self, job): + return job in self._waiting + + def CheckAndRegister(self, job, dep_job_id, dep_status): + (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0) + + assert exp_job == job + assert exp_dep_job_id == dep_job_id + assert exp_dep_status == dep_status + + (result_status, _) = result + + if result_status == jqueue._JobDependencyManager.WAIT: + self._waiting.add(job) + elif result_status == jqueue._JobDependencyManager.CONTINUE: + self._waiting.remove(job) + + return result + + def NotifyWaiters(self, job_id): + self._notifications.append(job_id) + + +class _DisabledFakeDependencyManager: + def JobWaiting(self, _): + return False + + def CheckAndRegister(self, *args): + assert False, "Should not be called" + + def NotifyWaiters(self, _): + pass + + +class _FakeQueueForProc: + def __init__(self, depmgr=None): self._acquired = False self._updates = [] self._submitted = [] self._submit_count = itertools.count(1000) + if depmgr: + self.depmgr = depmgr + else: + self.depmgr = _DisabledFakeDependencyManager() + def IsAcquired(self): return self._acquired @@ -493,7 +562,7 @@ class _FakeExecOpCodeForProc: class _JobProcessorTestUtils: def _CreateJob(self, queue, job_id, ops): - job = jqueue._QueuedJob(queue, job_id, ops) + job = jqueue._QueuedJob(queue, job_id, ops, True) self.assertFalse(job.start_timestamp) self.assertFalse(job.end_timestamp) self.assertEqual(len(ops), len(job.ops)) @@ -535,7 +604,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(queue.GetNextUpdate(), (job, True)) self.assertRaises(IndexError, queue.GetNextUpdate) self.assertFalse(queue.IsAcquired()) - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) self.assertFalse(job.cur_opctx) def _AfterStart(op, cbs): @@ -559,9 +628,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, queue.GetNextUpdate) if idx == len(ops) - 1: # Last opcode - self.assert_(result) + self.assertEqual(result, jqueue._JobProcessor.FINISHED) else: - self.assertFalse(result) + self.assertEqual(result, jqueue._JobProcessor.DEFER) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) self.assert_(job.start_timestamp) @@ -581,7 +650,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._GenericCheckJob(job) # Calling the processor on a finished job should be a no-op - self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) self.assertRaises(IndexError, queue.GetNextUpdate) def testOpcodeError(self): @@ -620,10 +690,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): if idx in (failfrom, len(ops) - 1): # Last opcode - self.assert_(result) + self.assertEqual(result, jqueue._JobProcessor.FINISHED) break - self.assertFalse(result) + self.assertEqual(result, jqueue._JobProcessor.DEFER) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) @@ -659,7 +729,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._GenericCheckJob(job) # Calling the processor on a finished job should be a no-op - self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) self.assertRaises(IndexError, queue.GetNextUpdate) def testCancelWhileInQueue(self): @@ -690,7 +761,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): # Simulate processor called in workerpool opexec = _FakeExecOpCodeForProc(queue, None, None) - self.assert_(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) # Check result self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) @@ -719,11 +791,11 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - job.ops[0].status = constants.OP_STATUS_WAITLOCK + job.ops[0].status = constants.OP_STATUS_WAITING assert len(job.ops) == 5 - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) # Mark as cancelling (success, _) = job.Cancel() @@ -735,7 +807,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): for op in job.ops)) opexec = _FakeExecOpCodeForProc(queue, None, None) - self.assert_(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) # Check result self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) @@ -764,7 +837,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(queue.GetNextUpdate(), (job, True)) self.assertRaises(IndexError, queue.GetNextUpdate) self.assertFalse(queue.IsAcquired()) - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) # Mark as cancelled (success, _) = job.Cancel() @@ -783,7 +856,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) self.assertRaises(IndexError, queue.GetNextUpdate) - self.assert_(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) self.assertEqual(queue.GetNextUpdate(), (job, True)) self.assertRaises(IndexError, queue.GetNextUpdate) @@ -812,7 +886,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): def _BeforeStart(timeout, priority): self.assertFalse(queue.IsAcquired()) - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) # Mark as cancelled (success, _) = job.Cancel() @@ -829,7 +903,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) - self.assert_(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) # Check result self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) @@ -858,7 +933,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): opexec = _FakeExecOpCodeForProc(queue, None, None) # Run one opcode - self.assertFalse(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) # Job goes back to queued self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) @@ -873,7 +949,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assert_(success) # Try processing another opcode (this will actually cancel the job) - self.assert_(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) # Check result self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) @@ -903,7 +980,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) for _ in range(successcount): - self.assertFalse(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) self.assertEqual(job.GetInfo(["opstatus"]), @@ -915,7 +993,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assert_(job.ops_iter) # Serialize and restore (simulates program restart) - newjob = jqueue._QueuedJob.Restore(queue, job.Serialize()) + newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True) self.assertFalse(newjob.ops_iter) self._TestPartial(newjob, successcount) @@ -935,10 +1013,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): if remaining == 0: # Last opcode - self.assert_(result) + self.assertEqual(result, jqueue._JobProcessor.FINISHED) break - self.assertFalse(result) + self.assertEqual(result, jqueue._JobProcessor.DEFER) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) @@ -955,12 +1033,15 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._GenericCheckJob(job) # Calling the processor on a finished job should be a no-op - self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) self.assertRaises(IndexError, queue.GetNextUpdate) # ... also after being restored - job2 = jqueue._QueuedJob.Restore(queue, job.Serialize()) - self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)()) + job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True) + # Calling the processor on a finished job should be a no-op + self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(), + jqueue._JobProcessor.FINISHED) self.assertRaises(IndexError, queue.GetNextUpdate) def testProcessorOnRunningJob(self): @@ -1010,7 +1091,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(queue.GetNextUpdate(), (job, True)) self.assertRaises(IndexError, queue.GetNextUpdate) self.assertFalse(queue.IsAcquired()) - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) def _AfterStart(op, cbs): self.assertEqual(queue.GetNextUpdate(), (job, True)) @@ -1041,10 +1122,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): if remaining == 0: # Last opcode - self.assert_(result) + self.assertEqual(result, jqueue._JobProcessor.FINISHED) break - self.assertFalse(result) + self.assertEqual(result, jqueue._JobProcessor.DEFER) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) @@ -1059,7 +1140,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._CheckLogMessages(job, logmsgcount) # Serialize and restore (simulates program restart) - newjob = jqueue._QueuedJob.Restore(queue, job.Serialize()) + newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True) self._CheckLogMessages(newjob, logmsgcount) # Check each message @@ -1125,7 +1206,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(queue.GetNextUpdate(), (job, True)) self.assertRaises(IndexError, queue.GetNextUpdate) self.assertFalse(queue.IsAcquired()) - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) self.assertFalse(job.cur_opctx) def _AfterStart(op, cbs): @@ -1149,9 +1230,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, queue.GetNextUpdate) if idx == len(ops) - 1: # Last opcode - self.assert_(result) + self.assertEqual(result, jqueue._JobProcessor.FINISHED) else: - self.assertFalse(result) + self.assertEqual(result, jqueue._JobProcessor.DEFER) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) self.assert_(job.start_timestamp) @@ -1176,7 +1257,371 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._GenericCheckJob(job) # Calling the processor on a finished job should be a no-op - self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) + self.assertRaises(IndexError, queue.GetNextUpdate) + + def testJobDependency(self): + depmgr = _FakeDependencyManager() + queue = _FakeQueueForProc(depmgr=depmgr) + + self.assertEqual(queue.depmgr, depmgr) + + prev_job_id = 22113 + prev_job_id2 = 28102 + job_id = 29929 + ops = [ + opcodes.OpTestDummy(result="Res0", fail=False, + depends=[ + [prev_job_id2, None], + [prev_job_id, None], + ]), + opcodes.OpTestDummy(result="Res1", fail=False), + ] + + # Create job + job = self._CreateJob(queue, job_id, ops) + + def _BeforeStart(timeout, priority): + if attempt == 0 or attempt > 5: + # Job should only be updated when it wasn't waiting for another job + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + self.assertFalse(job.cur_opctx) + + def _AfterStart(op, cbs): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + self.assertFalse(job.cur_opctx) + + # Job is running, cancelling shouldn't be possible + (success, _) = job.Cancel() + self.assertFalse(success) + + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + counter = itertools.count() + while True: + attempt = counter.next() + + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertRaises(IndexError, depmgr.GetNextNotification) + + if attempt < 2: + depmgr.AddCheckResult(job, prev_job_id2, None, + (jqueue._JobDependencyManager.WAIT, "wait2")) + elif attempt == 2: + depmgr.AddCheckResult(job, prev_job_id2, None, + (jqueue._JobDependencyManager.CONTINUE, "cont")) + # The processor will ask for the next dependency immediately + depmgr.AddCheckResult(job, prev_job_id, None, + (jqueue._JobDependencyManager.WAIT, "wait")) + elif attempt < 5: + depmgr.AddCheckResult(job, prev_job_id, None, + (jqueue._JobDependencyManager.WAIT, "wait")) + elif attempt == 5: + depmgr.AddCheckResult(job, prev_job_id, None, + (jqueue._JobDependencyManager.CONTINUE, "cont")) + if attempt == 2: + self.assertEqual(depmgr.CountPendingResults(), 2) + elif attempt > 5: + self.assertEqual(depmgr.CountPendingResults(), 0) + else: + self.assertEqual(depmgr.CountPendingResults(), 1) + + result = jqueue._JobProcessor(queue, opexec, job)() + if attempt == 0 or attempt >= 5: + # Job should only be updated if there was an actual change + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertFalse(depmgr.CountPendingResults()) + + if attempt < 5: + # Simulate waiting for other job + self.assertEqual(result, jqueue._JobProcessor.WAITDEP) + self.assertTrue(job.cur_opctx) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + self.assertRaises(IndexError, depmgr.GetNextNotification) + self.assert_(job.start_timestamp) + self.assertFalse(job.end_timestamp) + continue + + if result == jqueue._JobProcessor.FINISHED: + # Last opcode + self.assertFalse(job.cur_opctx) + break + + self.assertRaises(IndexError, depmgr.GetNextNotification) + + self.assertEqual(result, jqueue._JobProcessor.DEFER) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assert_(job.start_timestamp) + self.assertFalse(job.end_timestamp) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(job.GetInfo(["opresult"]), + [[op.input.result for op in job.ops]]) + self.assertEqual(job.GetInfo(["opstatus"]), + [len(job.ops) * [constants.OP_STATUS_SUCCESS]]) + self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp + for op in job.ops)) + + self._GenericCheckJob(job) + + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertRaises(IndexError, depmgr.GetNextNotification) + self.assertFalse(depmgr.CountPendingResults()) + self.assertFalse(depmgr.CountWaitingJobs()) + + # Calling the processor on a finished job should be a no-op + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) + self.assertRaises(IndexError, queue.GetNextUpdate) + + def testJobDependencyCancel(self): + depmgr = _FakeDependencyManager() + queue = _FakeQueueForProc(depmgr=depmgr) + + self.assertEqual(queue.depmgr, depmgr) + + prev_job_id = 13623 + job_id = 30876 + ops = [ + opcodes.OpTestDummy(result="Res0", fail=False), + opcodes.OpTestDummy(result="Res1", fail=False, + depends=[ + [prev_job_id, None], + ]), + opcodes.OpTestDummy(result="Res2", fail=False), + ] + + # Create job + job = self._CreateJob(queue, job_id, ops) + + def _BeforeStart(timeout, priority): + if attempt == 0 or attempt > 5: + # Job should only be updated when it wasn't waiting for another job + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + self.assertFalse(job.cur_opctx) + + def _AfterStart(op, cbs): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + self.assertFalse(job.cur_opctx) + + # Job is running, cancelling shouldn't be possible + (success, _) = job.Cancel() + self.assertFalse(success) + + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + counter = itertools.count() + while True: + attempt = counter.next() + + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertRaises(IndexError, depmgr.GetNextNotification) + + if attempt == 0: + # This will handle the first opcode + pass + elif attempt < 4: + depmgr.AddCheckResult(job, prev_job_id, None, + (jqueue._JobDependencyManager.WAIT, "wait")) + elif attempt == 4: + # Other job was cancelled + depmgr.AddCheckResult(job, prev_job_id, None, + (jqueue._JobDependencyManager.CANCEL, "cancel")) + + if attempt == 0: + self.assertEqual(depmgr.CountPendingResults(), 0) + else: + self.assertEqual(depmgr.CountPendingResults(), 1) + + result = jqueue._JobProcessor(queue, opexec, job)() + if attempt <= 1 or attempt >= 4: + # Job should only be updated if there was an actual change + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertFalse(depmgr.CountPendingResults()) + + if attempt > 0 and attempt < 4: + # Simulate waiting for other job + self.assertEqual(result, jqueue._JobProcessor.WAITDEP) + self.assertTrue(job.cur_opctx) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + self.assertRaises(IndexError, depmgr.GetNextNotification) + self.assert_(job.start_timestamp) + self.assertFalse(job.end_timestamp) + continue + + if result == jqueue._JobProcessor.FINISHED: + # Last opcode + self.assertFalse(job.cur_opctx) + break + + self.assertRaises(IndexError, depmgr.GetNextNotification) + + self.assertEqual(result, jqueue._JobProcessor.DEFER) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assert_(job.start_timestamp) + self.assertFalse(job.end_timestamp) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED]) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_CANCELED, + constants.OP_STATUS_CANCELED], + ["Res0", "Job canceled by request", + "Job canceled by request"]]) + + self._GenericCheckJob(job) + + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertRaises(IndexError, depmgr.GetNextNotification) + self.assertFalse(depmgr.CountPendingResults()) + + # Calling the processor on a finished job should be a no-op + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) + self.assertRaises(IndexError, queue.GetNextUpdate) + + def testJobDependencyWrongstatus(self): + depmgr = _FakeDependencyManager() + queue = _FakeQueueForProc(depmgr=depmgr) + + self.assertEqual(queue.depmgr, depmgr) + + prev_job_id = 9741 + job_id = 11763 + ops = [ + opcodes.OpTestDummy(result="Res0", fail=False), + opcodes.OpTestDummy(result="Res1", fail=False, + depends=[ + [prev_job_id, None], + ]), + opcodes.OpTestDummy(result="Res2", fail=False), + ] + + # Create job + job = self._CreateJob(queue, job_id, ops) + + def _BeforeStart(timeout, priority): + if attempt == 0 or attempt > 5: + # Job should only be updated when it wasn't waiting for another job + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + self.assertFalse(job.cur_opctx) + + def _AfterStart(op, cbs): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + self.assertFalse(job.cur_opctx) + + # Job is running, cancelling shouldn't be possible + (success, _) = job.Cancel() + self.assertFalse(success) + + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + counter = itertools.count() + while True: + attempt = counter.next() + + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertRaises(IndexError, depmgr.GetNextNotification) + + if attempt == 0: + # This will handle the first opcode + pass + elif attempt < 4: + depmgr.AddCheckResult(job, prev_job_id, None, + (jqueue._JobDependencyManager.WAIT, "wait")) + elif attempt == 4: + # Other job failed + depmgr.AddCheckResult(job, prev_job_id, None, + (jqueue._JobDependencyManager.WRONGSTATUS, "w")) + + if attempt == 0: + self.assertEqual(depmgr.CountPendingResults(), 0) + else: + self.assertEqual(depmgr.CountPendingResults(), 1) + + result = jqueue._JobProcessor(queue, opexec, job)() + if attempt <= 1 or attempt >= 4: + # Job should only be updated if there was an actual change + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertFalse(depmgr.CountPendingResults()) + + if attempt > 0 and attempt < 4: + # Simulate waiting for other job + self.assertEqual(result, jqueue._JobProcessor.WAITDEP) + self.assertTrue(job.cur_opctx) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + self.assertRaises(IndexError, depmgr.GetNextNotification) + self.assert_(job.start_timestamp) + self.assertFalse(job.end_timestamp) + continue + + if result == jqueue._JobProcessor.FINISHED: + # Last opcode + self.assertFalse(job.cur_opctx) + break + + self.assertRaises(IndexError, depmgr.GetNextNotification) + + self.assertEqual(result, jqueue._JobProcessor.DEFER) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assert_(job.start_timestamp) + self.assertFalse(job.end_timestamp) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR]) + self.assertEqual(job.GetInfo(["opstatus"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_ERROR, + constants.OP_STATUS_ERROR]]), + + (opresult, ) = job.GetInfo(["opresult"]) + self.assertEqual(len(opresult), len(ops)) + self.assertEqual(opresult[0], "Res0") + self.assertTrue(errors.GetEncodedError(opresult[1])) + self.assertTrue(errors.GetEncodedError(opresult[2])) + + self._GenericCheckJob(job) + + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertRaises(IndexError, depmgr.GetNextNotification) + self.assertFalse(depmgr.CountPendingResults()) + + # Calling the processor on a finished job should be a no-op + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) self.assertRaises(IndexError, queue.GetNextUpdate) @@ -1220,7 +1665,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, self.queue.GetNextUpdate) self.assertFalse(self.queue.IsAcquired()) - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) ts = self.timeout_strategy @@ -1353,7 +1798,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): result = proc(_nextop_fn=self._NextOpcode) assert self.curop is not None - if result or self.gave_lock: + if result == jqueue._JobProcessor.FINISHED or self.gave_lock: # Got lock and/or job is done, result must've been written self.assertFalse(job.cur_opctx) self.assertEqual(self.queue.GetNextUpdate(), (job, True)) @@ -1361,11 +1806,11 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority) self.assert_(job.ops[self.curop].exec_timestamp) - if result: + if result == jqueue._JobProcessor.FINISHED: self.assertFalse(job.cur_opctx) break - self.assertFalse(result) + self.assertEqual(result, jqueue._JobProcessor.DEFER) if self.curop == 0: self.assertEqual(job.ops[self.curop].start_timestamp, @@ -1380,7 +1825,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.cur_opctx._timeout_strategy._fn, self.timeout_strategy.NextAttempt) self.assertFalse(job.ops[self.curop].exec_timestamp) - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) # If priority has changed since acquiring locks, the job must've been # updated @@ -1408,9 +1853,307 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): for op in job.ops)) # Calling the processor on a finished job should be a no-op - self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)()) + self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) self.assertRaises(IndexError, self.queue.GetNextUpdate) +class TestJobDependencyManager(unittest.TestCase): + class _FakeJob: + def __init__(self, job_id): + self.id = str(job_id) + + def setUp(self): + self._status = [] + self._queue = [] + self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue) + + def _GetStatus(self, job_id): + (exp_job_id, result) = self._status.pop(0) + self.assertEqual(exp_job_id, job_id) + return result + + def _Enqueue(self, jobs): + self.assertFalse(self.jdm._lock.is_owned(), + msg=("Must not own manager lock while re-adding jobs" + " (potential deadlock)")) + self._queue.append(jobs) + + def testNotFinalizedThenCancel(self): + job = self._FakeJob(17697) + job_id = str(28625) + + self._status.append((job_id, constants.JOB_STATUS_RUNNING)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, []) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set([job]), + }) + self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [ + ("job/28625", None, None, [("job", [job.id])]) + ]) + + self._status.append((job_id, constants.JOB_STATUS_CANCELED)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, []) + self.assertEqual(result, self.jdm.CANCEL) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertFalse(self.jdm.JobWaiting(job)) + self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) + + def testRequireCancel(self): + job = self._FakeJob(5278) + job_id = str(9610) + dep_status = [constants.JOB_STATUS_CANCELED] + + self._status.append((job_id, constants.JOB_STATUS_WAITING)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set([job]), + }) + self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [ + ("job/9610", None, None, [("job", [job.id])]) + ]) + + self._status.append((job_id, constants.JOB_STATUS_CANCELED)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) + self.assertEqual(result, self.jdm.CONTINUE) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertFalse(self.jdm.JobWaiting(job)) + self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) + + def testRequireError(self): + job = self._FakeJob(21459) + job_id = str(25519) + dep_status = [constants.JOB_STATUS_ERROR] + + self._status.append((job_id, constants.JOB_STATUS_WAITING)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set([job]), + }) + + self._status.append((job_id, constants.JOB_STATUS_ERROR)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) + self.assertEqual(result, self.jdm.CONTINUE) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertFalse(self.jdm.JobWaiting(job)) + self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) + + def testRequireMultiple(self): + dep_status = list(constants.JOBS_FINALIZED) + + for end_status in dep_status: + job = self._FakeJob(21343) + job_id = str(14609) + + self._status.append((job_id, constants.JOB_STATUS_WAITING)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set([job]), + }) + self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [ + ("job/14609", None, None, [("job", [job.id])]) + ]) + + self._status.append((job_id, end_status)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) + self.assertEqual(result, self.jdm.CONTINUE) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertFalse(self.jdm.JobWaiting(job)) + self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) + + def testNotify(self): + job = self._FakeJob(8227) + job_id = str(4113) + + self._status.append((job_id, constants.JOB_STATUS_RUNNING)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, []) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set([job]), + }) + + self.jdm.NotifyWaiters(job_id) + self.assertFalse(self._status) + self.assertFalse(self.jdm._waiters) + self.assertFalse(self.jdm.JobWaiting(job)) + self.assertEqual(self._queue, [set([job])]) + + def testWrongStatus(self): + job = self._FakeJob(10102) + job_id = str(1271) + + self._status.append((job_id, constants.JOB_STATUS_QUEUED)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, + [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set([job]), + }) + + self._status.append((job_id, constants.JOB_STATUS_ERROR)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, + [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(result, self.jdm.WRONGSTATUS) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertFalse(self.jdm.JobWaiting(job)) + + def testCorrectStatus(self): + job = self._FakeJob(24273) + job_id = str(23885) + + self._status.append((job_id, constants.JOB_STATUS_QUEUED)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, + [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set([job]), + }) + + self._status.append((job_id, constants.JOB_STATUS_SUCCESS)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, + [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(result, self.jdm.CONTINUE) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertFalse(self.jdm.JobWaiting(job)) + + def testFinalizedRightAway(self): + job = self._FakeJob(224) + job_id = str(3081) + + self._status.append((job_id, constants.JOB_STATUS_SUCCESS)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, + [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(result, self.jdm.CONTINUE) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertFalse(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set(), + }) + + # Force cleanup + self.jdm.NotifyWaiters("0") + self.assertFalse(self.jdm._waiters) + self.assertFalse(self._status) + self.assertFalse(self._queue) + + def testMultipleWaiting(self): + # Use a deterministic random generator + rnd = random.Random(21402) + + job_ids = map(str, rnd.sample(range(1, 10000), 150)) + + waiters = dict((job_ids.pop(), + set(map(self._FakeJob, + [job_ids.pop() + for _ in range(rnd.randint(1, 20))]))) + for _ in range(10)) + + # Ensure there are no duplicate job IDs + assert not utils.FindDuplicates(waiters.keys() + + [job.id + for jobs in waiters.values() + for job in jobs]) + + # Register all jobs as waiters + for job_id, job in [(job_id, job) + for (job_id, jobs) in waiters.items() + for job in jobs]: + self._status.append((job_id, constants.JOB_STATUS_QUEUED)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, + [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + + self.assertEqual(self.jdm._waiters, waiters) + + def _MakeSet((name, mode, owner_names, pending)): + return (name, mode, owner_names, + [(pendmode, set(pend)) for (pendmode, pend) in pending]) + + def _CheckLockInfo(): + info = self.jdm.GetLockInfo([query.LQ_PENDING]) + self.assertEqual(sorted(map(_MakeSet, info)), sorted([ + ("job/%s" % job_id, None, None, + [("job", set([job.id for job in jobs]))]) + for job_id, jobs in waiters.items() + if jobs + ])) + + _CheckLockInfo() + + # Notify in random order + for job_id in rnd.sample(waiters, len(waiters)): + # Remove from pending waiter list + jobs = waiters.pop(job_id) + for job in jobs: + self._status.append((job_id, constants.JOB_STATUS_SUCCESS)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, + [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(result, self.jdm.CONTINUE) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertFalse(self.jdm.JobWaiting(job)) + + _CheckLockInfo() + + self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) + + assert not waiters + + def testSelfDependency(self): + job = self._FakeJob(18937) + + self._status.append((job.id, constants.JOB_STATUS_SUCCESS)) + (result, _) = self.jdm.CheckAndRegister(job, job.id, []) + self.assertEqual(result, self.jdm.ERROR) + + def testJobDisappears(self): + job = self._FakeJob(30540) + job_id = str(23769) + + def _FakeStatus(_): + raise errors.JobLost("#msg#") + + jdm = jqueue._JobDependencyManager(_FakeStatus, None) + (result, _) = jdm.CheckAndRegister(job, job_id, []) + self.assertEqual(result, self.jdm.ERROR) + self.assertFalse(jdm.JobWaiting(job)) + self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING])) + + if __name__ == "__main__": testutils.GanetiTestProgram()