Implement submitting jobs from logical units
authorMichael Hanselmann <hansmi@google.com>
Fri, 25 Mar 2011 13:22:24 +0000 (14:22 +0100)
committerMichael Hanselmann <hansmi@google.com>
Fri, 25 Mar 2011 13:53:02 +0000 (14:53 +0100)
The design details can be seen in the design document
(doc/design-lu-generated-jobs.rst).

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: RenĂ© Nussbaumer <rn@google.com>

lib/cmdlib.py
lib/constants.py
lib/jqueue.py
lib/mcpu.py
lib/opcodes.py
test/ganeti.jqueue_unittest.py

index af51ec2..a35a54b 100644 (file)
@@ -74,7 +74,28 @@ def _SupportsOob(cfg, node):
   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
 
 
-# End types
+class ResultWithJobs:
+  """Data container for LU results with jobs.
+
+  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
+  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+  contained in the C{jobs} attribute and include the job IDs in the opcode
+  result.
+
+  """
+  def __init__(self, jobs, **kwargs):
+    """Initializes this class.
+
+    Additional return values can be specified as keyword arguments.
+
+    @type jobs: list of lists of L{opcode.OpCode}
+    @param jobs: A list of lists of opcode objects
+
+    """
+    self.jobs = jobs
+    self.other = kwargs
+
+
 class LogicalUnit(object):
   """Logical Unit base class.
 
index 2ae1a43..973914b 100644 (file)
@@ -529,6 +529,9 @@ DISK_TRANSFER_CONNECT_TIMEOUT = 60
 # Disk index separator
 DISK_SEPARATOR = _autoconf.DISK_SEPARATOR
 
+#: Key for job IDs in opcode result
+JOB_IDS_KEY = "jobs"
+
 # runparts results
 (RUNPARTS_SKIP,
  RUNPARTS_RUN,
index 2871c6a..6b60c5e 100644 (file)
@@ -540,6 +540,15 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # Cancel here if we were asked to
     self._CheckCancel()
 
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
+
+    See L{JobQueue.SubmitManyJobs}.
+
+    """
+    # Locking is done in job queue
+    return self._queue.SubmitManyJobs(jobs)
+
 
 class _JobChangesChecker(object):
   def __init__(self, fields, prev_job_info, prev_log_serial):
index 863cf9a..37588e1 100644 (file)
@@ -144,6 +144,14 @@ class OpExecCbBase: # pylint: disable-msg=W0232
 
     """
 
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
+
+    See L{jqueue.JobQueue.SubmitManyJobs}.
+
+    """
+    raise NotImplementedError
+
 
 def _LUNameForOpName(opname):
   """Computes the LU name for a given OpCode name.
@@ -209,6 +217,24 @@ class Processor(object):
 
     return acquired
 
+  def _ProcessResult(self, result):
+    """
+
+    """
+    if isinstance(result, cmdlib.ResultWithJobs):
+      # Submit jobs
+      job_submission = self._cbs.SubmitManyJobs(result.jobs)
+
+      # Build dictionary
+      result = result.other
+
+      assert constants.JOB_IDS_KEY not in result, \
+        "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+      result[constants.JOB_IDS_KEY] = job_submission
+
+    return result
+
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
@@ -229,7 +255,7 @@ class Processor(object):
       return lu.dry_run_result
 
     try:
-      result = lu.Exec(self.Log)
+      result = self._ProcessResult(lu.Exec(self.Log))
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
                                 self.Log, result)
index c711046..b09ab30 100644 (file)
@@ -1423,6 +1423,7 @@ class OpTestDummy(OpCode):
     ("result", ht.NoDefault, ht.NoType, None),
     ("messages", ht.NoDefault, ht.NoType, None),
     ("fail", ht.NoDefault, ht.NoType, None),
+    ("submit_jobs", None, ht.NoType, None),
     ]
   WITH_LU = False
 
index 9137927..73cf2bc 100755 (executable)
@@ -428,6 +428,9 @@ class _FakeQueueForProc:
   def __init__(self):
     self._acquired = False
     self._updates = []
+    self._submitted = []
+
+    self._submit_count = itertools.count(1000)
 
   def IsAcquired(self):
     return self._acquired
@@ -435,6 +438,9 @@ class _FakeQueueForProc:
   def GetNextUpdate(self):
     return self._updates.pop(0)
 
+  def GetNextSubmittedJob(self):
+    return self._submitted.pop(0)
+
   def acquire(self, shared=0):
     assert shared == 1
     self._acquired = True
@@ -447,6 +453,12 @@ class _FakeQueueForProc:
     assert self._acquired, "Lock not acquired while updating job"
     self._updates.append((job, bool(replicate)))
 
+  def SubmitManyJobs(self, jobs):
+    assert not self._acquired, "Lock acquired while submitting jobs"
+    job_ids = [self._submit_count.next() for _ in jobs]
+    self._submitted.extend(zip(job_ids, jobs))
+    return job_ids
+
 
 class _FakeExecOpCodeForProc:
   def __init__(self, queue, before_start, after_start):
@@ -473,6 +485,9 @@ class _FakeExecOpCodeForProc:
     if op.fail:
       raise errors.OpExecError("Error requested (%s)" % op.result)
 
+    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
+      return cbs.SubmitManyJobs(op.submit_jobs)
+
     return op.result
 
 
@@ -1065,6 +1080,90 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertFalse(job.GetLogEntries(count))
     self.assertFalse(job.GetLogEntries(count + 3))
 
+  def testSubmitManyJobs(self):
+    queue = _FakeQueueForProc()
+
+    job_id = 15656
+    ops = [
+      opcodes.OpTestDummy(result="Res0", fail=False,
+                          submit_jobs=[]),
+      opcodes.OpTestDummy(result="Res1", fail=False,
+                          submit_jobs=[
+                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
+                            ]),
+      opcodes.OpTestDummy(result="Res2", fail=False,
+                          submit_jobs=[
+                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
+                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
+                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
+                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
+                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
+                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
+                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
+                            ]),
+      ]
+
+    # Create job
+    job = self._CreateJob(queue, job_id, ops)
+
+    def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertFalse(job.cur_opctx)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+      self.assertFalse(job.cur_opctx)
+
+      # Job is running, cancelling shouldn't be possible
+      (success, _) = job.Cancel()
+      self.assertFalse(success)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    for idx in range(len(ops)):
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      result = jqueue._JobProcessor(queue, opexec, job)()
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      if idx == len(ops) - 1:
+        # Last opcode
+        self.assert_(result)
+      else:
+        self.assertFalse(result)
+
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+        self.assert_(job.start_timestamp)
+        self.assertFalse(job.end_timestamp)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    for idx, submitted_ops in enumerate(job_ops
+                                        for op in ops
+                                        for job_ops in op.submit_jobs):
+      self.assertEqual(queue.GetNextSubmittedJob(),
+                       (1000 + idx, submitted_ops))
+    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[[], [1000], [1001, 1002, 1003]]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+
+    self._GenericCheckJob(job)
+
+    # Finished jobs can't be processed any further
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(queue, opexec, job))
+
 
 class _FakeTimeoutStrategy:
   def __init__(self, timeouts):