def __init__(self):
self._acquired = False
self._updates = []
+ self._submitted = []
+
+ self._submit_count = itertools.count(1000)
def IsAcquired(self):
return self._acquired
def GetNextUpdate(self):
return self._updates.pop(0)
+ def GetNextSubmittedJob(self):
+ return self._submitted.pop(0)
+
def acquire(self, shared=0):
assert shared == 1
self._acquired = True
assert self._acquired, "Lock not acquired while updating job"
self._updates.append((job, bool(replicate)))
+ def SubmitManyJobs(self, jobs):
+ assert not self._acquired, "Lock acquired while submitting jobs"
+ job_ids = [self._submit_count.next() for _ in jobs]
+ self._submitted.extend(zip(job_ids, jobs))
+ return job_ids
+
class _FakeExecOpCodeForProc:
def __init__(self, queue, before_start, after_start):
if op.fail:
raise errors.OpExecError("Error requested (%s)" % op.result)
+ if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
+ return cbs.SubmitManyJobs(op.submit_jobs)
+
return op.result
self.assertFalse(job.GetLogEntries(count))
self.assertFalse(job.GetLogEntries(count + 3))
+ def testSubmitManyJobs(self):
+ queue = _FakeQueueForProc()
+
+ job_id = 15656
+ ops = [
+ opcodes.OpTestDummy(result="Res0", fail=False,
+ submit_jobs=[]),
+ opcodes.OpTestDummy(result="Res1", fail=False,
+ submit_jobs=[
+ [opcodes.OpTestDummy(result="r1j0", fail=False)],
+ ]),
+ opcodes.OpTestDummy(result="Res2", fail=False,
+ submit_jobs=[
+ [opcodes.OpTestDummy(result="r2j0o0", fail=False),
+ opcodes.OpTestDummy(result="r2j0o1", fail=False),
+ opcodes.OpTestDummy(result="r2j0o2", fail=False),
+ opcodes.OpTestDummy(result="r2j0o3", fail=False)],
+ [opcodes.OpTestDummy(result="r2j1", fail=False)],
+ [opcodes.OpTestDummy(result="r2j3o0", fail=False),
+ opcodes.OpTestDummy(result="r2j3o1", fail=False)],
+ ]),
+ ]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ for idx in range(len(ops)):
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ if idx == len(ops) - 1:
+ # Last opcode
+ self.assert_(result)
+ else:
+ self.assertFalse(result)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ for idx, submitted_ops in enumerate(job_ops
+ for op in ops
+ for job_ops in op.submit_jobs):
+ self.assertEqual(queue.GetNextSubmittedJob(),
+ (1000 + idx, submitted_ops))
+ self.assertRaises(IndexError, queue.GetNextSubmittedJob)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(job.GetInfo(["opresult"]),
+ [[[], [1000], [1001, 1002, 1003]]])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+
+ self._GenericCheckJob(job)
+
+ # Finished jobs can't be processed any further
+ self.assertRaises(errors.ProgrammerError,
+ jqueue._JobProcessor(queue, opexec, job))
+
class _FakeTimeoutStrategy:
def __init__(self, timeouts):