+ def testSubmitManyJobs(self):
+ queue = _FakeQueueForProc()
+
+ job_id = 15656
+ ops = [
+ opcodes.OpTestDummy(result="Res0", fail=False,
+ submit_jobs=[]),
+ opcodes.OpTestDummy(result="Res1", fail=False,
+ submit_jobs=[
+ [opcodes.OpTestDummy(result="r1j0", fail=False)],
+ ]),
+ opcodes.OpTestDummy(result="Res2", fail=False,
+ submit_jobs=[
+ [opcodes.OpTestDummy(result="r2j0o0", fail=False),
+ opcodes.OpTestDummy(result="r2j0o1", fail=False),
+ opcodes.OpTestDummy(result="r2j0o2", fail=False),
+ opcodes.OpTestDummy(result="r2j0o3", fail=False)],
+ [opcodes.OpTestDummy(result="r2j1", fail=False)],
+ [opcodes.OpTestDummy(result="r2j3o0", fail=False),
+ opcodes.OpTestDummy(result="r2j3o1", fail=False)],
+ ]),
+ ]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ for idx in range(len(ops)):
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ if idx == len(ops) - 1:
+ # Last opcode
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
+ else:
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ for idx, submitted_ops in enumerate(job_ops
+ for op in ops
+ for job_ops in op.submit_jobs):
+ self.assertEqual(queue.GetNextSubmittedJob(),
+ (1000 + idx, submitted_ops))
+ self.assertRaises(IndexError, queue.GetNextSubmittedJob)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(job.GetInfo(["opresult"]),
+ [[[], [1000], [1001, 1002, 1003]]])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+
+ self._GenericCheckJob(job)
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testJobDependency(self):
+ depmgr = _FakeDependencyManager()
+ queue = _FakeQueueForProc(depmgr=depmgr)
+
+ self.assertEqual(queue.depmgr, depmgr)
+
+ prev_job_id = 22113
+ prev_job_id2 = 28102
+ job_id = 29929
+ ops = [
+ opcodes.OpTestDummy(result="Res0", fail=False,
+ depends=[
+ [prev_job_id2, None],
+ [prev_job_id, None],
+ ]),
+ opcodes.OpTestDummy(result="Res1", fail=False),
+ ]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ if attempt == 0 or attempt > 5:
+ # Job should only be updated when it wasn't waiting for another job
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ counter = itertools.count()
+ while True:
+ attempt = counter.next()
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ if attempt < 2:
+ depmgr.AddCheckResult(job, prev_job_id2, None,
+ (jqueue._JobDependencyManager.WAIT, "wait2"))
+ elif attempt == 2:
+ depmgr.AddCheckResult(job, prev_job_id2, None,
+ (jqueue._JobDependencyManager.CONTINUE, "cont"))
+ # The processor will ask for the next dependency immediately
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WAIT, "wait"))
+ elif attempt < 5:
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WAIT, "wait"))
+ elif attempt == 5:
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.CONTINUE, "cont"))
+ if attempt == 2:
+ self.assertEqual(depmgr.CountPendingResults(), 2)
+ elif attempt > 5:
+ self.assertEqual(depmgr.CountPendingResults(), 0)
+ else:
+ self.assertEqual(depmgr.CountPendingResults(), 1)
+
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ if attempt == 0 or attempt >= 5:
+ # Job should only be updated if there was an actual change
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ if attempt < 5:
+ # Simulate waiting for other job
+ self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
+ self.assertTrue(job.cur_opctx)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+ continue
+
+ if result == jqueue._JobProcessor.FINISHED:
+ # Last opcode
+ self.assertFalse(job.cur_opctx)
+ break
+
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(job.GetInfo(["opresult"]),
+ [[op.input.result for op in job.ops]])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+ self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
+ for op in job.ops))
+
+ self._GenericCheckJob(job)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assertFalse(depmgr.CountPendingResults())
+ self.assertFalse(depmgr.CountWaitingJobs())
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testJobDependencyCancel(self):
+ depmgr = _FakeDependencyManager()
+ queue = _FakeQueueForProc(depmgr=depmgr)
+
+ self.assertEqual(queue.depmgr, depmgr)
+
+ prev_job_id = 13623
+ job_id = 30876
+ ops = [
+ opcodes.OpTestDummy(result="Res0", fail=False),
+ opcodes.OpTestDummy(result="Res1", fail=False,
+ depends=[
+ [prev_job_id, None],
+ ]),
+ opcodes.OpTestDummy(result="Res2", fail=False),
+ ]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ if attempt == 0 or attempt > 5:
+ # Job should only be updated when it wasn't waiting for another job
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ counter = itertools.count()
+ while True:
+ attempt = counter.next()
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ if attempt == 0:
+ # This will handle the first opcode
+ pass
+ elif attempt < 4:
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WAIT, "wait"))
+ elif attempt == 4:
+ # Other job was cancelled
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.CANCEL, "cancel"))
+
+ if attempt == 0:
+ self.assertEqual(depmgr.CountPendingResults(), 0)
+ else:
+ self.assertEqual(depmgr.CountPendingResults(), 1)
+
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ if attempt <= 1 or attempt >= 4:
+ # Job should only be updated if there was an actual change
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ if attempt > 0 and attempt < 4:
+ # Simulate waiting for other job
+ self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
+ self.assertTrue(job.cur_opctx)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+ continue
+
+ if result == jqueue._JobProcessor.FINISHED:
+ # Last opcode
+ self.assertFalse(job.cur_opctx)
+ break
+
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_CANCELED,
+ constants.OP_STATUS_CANCELED],
+ ["Res0", "Job canceled by request",
+ "Job canceled by request"]])
+
+ self._GenericCheckJob(job)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testJobDependencyWrongstatus(self):
+ depmgr = _FakeDependencyManager()
+ queue = _FakeQueueForProc(depmgr=depmgr)
+
+ self.assertEqual(queue.depmgr, depmgr)
+
+ prev_job_id = 9741
+ job_id = 11763
+ ops = [
+ opcodes.OpTestDummy(result="Res0", fail=False),
+ opcodes.OpTestDummy(result="Res1", fail=False,
+ depends=[
+ [prev_job_id, None],
+ ]),
+ opcodes.OpTestDummy(result="Res2", fail=False),
+ ]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ if attempt == 0 or attempt > 5:
+ # Job should only be updated when it wasn't waiting for another job
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ counter = itertools.count()
+ while True:
+ attempt = counter.next()
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ if attempt == 0:
+ # This will handle the first opcode
+ pass
+ elif attempt < 4:
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WAIT, "wait"))
+ elif attempt == 4:
+ # Other job failed
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
+
+ if attempt == 0:
+ self.assertEqual(depmgr.CountPendingResults(), 0)
+ else:
+ self.assertEqual(depmgr.CountPendingResults(), 1)
+
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ if attempt <= 1 or attempt >= 4:
+ # Job should only be updated if there was an actual change
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ if attempt > 0 and attempt < 4:
+ # Simulate waiting for other job
+ self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
+ self.assertTrue(job.cur_opctx)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+ continue
+
+ if result == jqueue._JobProcessor.FINISHED:
+ # Last opcode
+ self.assertFalse(job.cur_opctx)
+ break
+
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [[constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_ERROR,
+ constants.OP_STATUS_ERROR]]),
+
+ (opresult, ) = job.GetInfo(["opresult"])
+ self.assertEqual(len(opresult), len(ops))
+ self.assertEqual(opresult[0], "Res0")
+ self.assertTrue(errors.GetEncodedError(opresult[1]))
+ self.assertTrue(errors.GetEncodedError(opresult[2]))
+
+ self._GenericCheckJob(job)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+