From: Michael Hanselmann Date: Fri, 25 Mar 2011 13:22:24 +0000 (+0100) Subject: Implement submitting jobs from logical units X-Git-Tag: v2.5.0beta1~479 X-Git-Url: https://code.grnet.gr/git/ganeti-local/commitdiff_plain/6a373640f87b69b67eddfdc69f5eea94bbac9eb9 Implement submitting jobs from logical units The design details can be seen in the design document (doc/design-lu-generated-jobs.rst). Signed-off-by: Michael Hanselmann Reviewed-by: René Nussbaumer --- diff --git a/lib/cmdlib.py b/lib/cmdlib.py index af51ec2..a35a54b 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -74,7 +74,28 @@ def _SupportsOob(cfg, node): return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] -# End types +class ResultWithJobs: + """Data container for LU results with jobs. + + Instances of this class returned from L{LogicalUnit.Exec} will be recognized + by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs + contained in the C{jobs} attribute and include the job IDs in the opcode + result. + + """ + def __init__(self, jobs, **kwargs): + """Initializes this class. + + Additional return values can be specified as keyword arguments. + + @type jobs: list of lists of L{opcode.OpCode} + @param jobs: A list of lists of opcode objects + + """ + self.jobs = jobs + self.other = kwargs + + class LogicalUnit(object): """Logical Unit base class. diff --git a/lib/constants.py b/lib/constants.py index 2ae1a43..973914b 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -529,6 +529,9 @@ DISK_TRANSFER_CONNECT_TIMEOUT = 60 # Disk index separator DISK_SEPARATOR = _autoconf.DISK_SEPARATOR +#: Key for job IDs in opcode result +JOB_IDS_KEY = "jobs" + # runparts results (RUNPARTS_SKIP, RUNPARTS_RUN, diff --git a/lib/jqueue.py b/lib/jqueue.py index 2871c6a..6b60c5e 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -540,6 +540,15 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): # Cancel here if we were asked to self._CheckCancel() + def SubmitManyJobs(self, jobs): + """Submits jobs for processing. + + See L{JobQueue.SubmitManyJobs}. + + """ + # Locking is done in job queue + return self._queue.SubmitManyJobs(jobs) + class _JobChangesChecker(object): def __init__(self, fields, prev_job_info, prev_log_serial): diff --git a/lib/mcpu.py b/lib/mcpu.py index 863cf9a..37588e1 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -144,6 +144,14 @@ class OpExecCbBase: # pylint: disable-msg=W0232 """ + def SubmitManyJobs(self, jobs): + """Submits jobs for processing. + + See L{jqueue.JobQueue.SubmitManyJobs}. + + """ + raise NotImplementedError + def _LUNameForOpName(opname): """Computes the LU name for a given OpCode name. @@ -209,6 +217,24 @@ class Processor(object): return acquired + def _ProcessResult(self, result): + """ + + """ + if isinstance(result, cmdlib.ResultWithJobs): + # Submit jobs + job_submission = self._cbs.SubmitManyJobs(result.jobs) + + # Build dictionary + result = result.other + + assert constants.JOB_IDS_KEY not in result, \ + "Key '%s' found in additional return values" % constants.JOB_IDS_KEY + + result[constants.JOB_IDS_KEY] = job_submission + + return result + def _ExecLU(self, lu): """Logical Unit execution sequence. @@ -229,7 +255,7 @@ class Processor(object): return lu.dry_run_result try: - result = lu.Exec(self.Log) + result = self._ProcessResult(lu.Exec(self.Log)) h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, self.Log, result) diff --git a/lib/opcodes.py b/lib/opcodes.py index c711046..b09ab30 100644 --- a/lib/opcodes.py +++ b/lib/opcodes.py @@ -1423,6 +1423,7 @@ class OpTestDummy(OpCode): ("result", ht.NoDefault, ht.NoType, None), ("messages", ht.NoDefault, ht.NoType, None), ("fail", ht.NoDefault, ht.NoType, None), + ("submit_jobs", None, ht.NoType, None), ] WITH_LU = False diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 9137927..73cf2bc 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -428,6 +428,9 @@ class _FakeQueueForProc: def __init__(self): self._acquired = False self._updates = [] + self._submitted = [] + + self._submit_count = itertools.count(1000) def IsAcquired(self): return self._acquired @@ -435,6 +438,9 @@ class _FakeQueueForProc: def GetNextUpdate(self): return self._updates.pop(0) + def GetNextSubmittedJob(self): + return self._submitted.pop(0) + def acquire(self, shared=0): assert shared == 1 self._acquired = True @@ -447,6 +453,12 @@ class _FakeQueueForProc: assert self._acquired, "Lock not acquired while updating job" self._updates.append((job, bool(replicate))) + def SubmitManyJobs(self, jobs): + assert not self._acquired, "Lock acquired while submitting jobs" + job_ids = [self._submit_count.next() for _ in jobs] + self._submitted.extend(zip(job_ids, jobs)) + return job_ids + class _FakeExecOpCodeForProc: def __init__(self, queue, before_start, after_start): @@ -473,6 +485,9 @@ class _FakeExecOpCodeForProc: if op.fail: raise errors.OpExecError("Error requested (%s)" % op.result) + if hasattr(op, "submit_jobs") and op.submit_jobs is not None: + return cbs.SubmitManyJobs(op.submit_jobs) + return op.result @@ -1065,6 +1080,90 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertFalse(job.GetLogEntries(count)) self.assertFalse(job.GetLogEntries(count + 3)) + def testSubmitManyJobs(self): + queue = _FakeQueueForProc() + + job_id = 15656 + ops = [ + opcodes.OpTestDummy(result="Res0", fail=False, + submit_jobs=[]), + opcodes.OpTestDummy(result="Res1", fail=False, + submit_jobs=[ + [opcodes.OpTestDummy(result="r1j0", fail=False)], + ]), + opcodes.OpTestDummy(result="Res2", fail=False, + submit_jobs=[ + [opcodes.OpTestDummy(result="r2j0o0", fail=False), + opcodes.OpTestDummy(result="r2j0o1", fail=False), + opcodes.OpTestDummy(result="r2j0o2", fail=False), + opcodes.OpTestDummy(result="r2j0o3", fail=False)], + [opcodes.OpTestDummy(result="r2j1", fail=False)], + [opcodes.OpTestDummy(result="r2j3o0", fail=False), + opcodes.OpTestDummy(result="r2j3o1", fail=False)], + ]), + ] + + # Create job + job = self._CreateJob(queue, job_id, ops) + + def _BeforeStart(timeout, priority): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertFalse(job.cur_opctx) + + def _AfterStart(op, cbs): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + self.assertFalse(job.cur_opctx) + + # Job is running, cancelling shouldn't be possible + (success, _) = job.Cancel() + self.assertFalse(success) + + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) + + for idx in range(len(ops)): + self.assertRaises(IndexError, queue.GetNextUpdate) + result = jqueue._JobProcessor(queue, opexec, job)() + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + if idx == len(ops) - 1: + # Last opcode + self.assert_(result) + else: + self.assertFalse(result) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assert_(job.start_timestamp) + self.assertFalse(job.end_timestamp) + + self.assertRaises(IndexError, queue.GetNextUpdate) + + for idx, submitted_ops in enumerate(job_ops + for op in ops + for job_ops in op.submit_jobs): + self.assertEqual(queue.GetNextSubmittedJob(), + (1000 + idx, submitted_ops)) + self.assertRaises(IndexError, queue.GetNextSubmittedJob) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(job.GetInfo(["opresult"]), + [[[], [1000], [1001, 1002, 1003]]]) + self.assertEqual(job.GetInfo(["opstatus"]), + [len(job.ops) * [constants.OP_STATUS_SUCCESS]]) + + self._GenericCheckJob(job) + + # Finished jobs can't be processed any further + self.assertRaises(errors.ProgrammerError, + jqueue._JobProcessor(queue, opexec, job)) + class _FakeTimeoutStrategy: def __init__(self, timeouts):