jqueue: Factorize checking job processor's result
authorMichael Hanselmann <hansmi@google.com>
Mon, 19 Dec 2011 15:25:24 +0000 (16:25 +0100)
committerMichael Hanselmann <hansmi@google.com>
Thu, 22 Dec 2011 13:19:36 +0000 (14:19 +0100)
This allows for more unittesting.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

lib/jqueue.py
test/ganeti.jqueue_unittest.py

index d5ea3cb..84f6120 100644 (file)
@@ -1237,6 +1237,29 @@ class _JobProcessor(object):
       queue.release()
 
 
+def _EvaluateJobProcessorResult(depmgr, job, result):
+  """Looks at a result from L{_JobProcessor} for a job.
+
+  To be used in a L{_JobQueueWorker}.
+
+  """
+  if result == _JobProcessor.FINISHED:
+    # Notify waiting jobs
+    depmgr.NotifyWaiters(job.id)
+
+  elif result == _JobProcessor.DEFER:
+    # Schedule again
+    raise workerpool.DeferTask(priority=job.CalcPriority())
+
+  elif result == _JobProcessor.WAITDEP:
+    # No-op, dependency manager will re-schedule
+    pass
+
+  else:
+    raise errors.ProgrammerError("Job processor returned unknown status %s" %
+                                 (result, ))
+
+
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
@@ -1277,23 +1300,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
                                     proc.ExecOpCode)
 
-    result = _JobProcessor(queue, wrap_execop_fn, job)()
-
-    if result == _JobProcessor.FINISHED:
-      # Notify waiting jobs
-      queue.depmgr.NotifyWaiters(job.id)
-
-    elif result == _JobProcessor.DEFER:
-      # Schedule again
-      raise workerpool.DeferTask(priority=job.CalcPriority())
-
-    elif result == _JobProcessor.WAITDEP:
-      # No-op, dependency manager will re-schedule
-      pass
-
-    else:
-      raise errors.ProgrammerError("Job processor returned unknown status %s" %
-                                   (result, ))
+    _EvaluateJobProcessorResult(queue.depmgr, job,
+                                _JobProcessor(queue, wrap_execop_fn, job)())
 
   @staticmethod
   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
index 4669320..cd0bacd 100755 (executable)
@@ -38,6 +38,7 @@ from ganeti import opcodes
 from ganeti import compat
 from ganeti import mcpu
 from ganeti import query
+from ganeti import workerpool
 
 import testutils
 
@@ -1625,6 +1626,43 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
 
+class TestEvaluateJobProcessorResult(unittest.TestCase):
+  def testFinished(self):
+    depmgr = _FakeDependencyManager()
+    job = _IdOnlyFakeJob(30953)
+    jqueue._EvaluateJobProcessorResult(depmgr, job,
+                                       jqueue._JobProcessor.FINISHED)
+    self.assertEqual(depmgr.GetNextNotification(), job.id)
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+  def testDefer(self):
+    depmgr = _FakeDependencyManager()
+    job = _IdOnlyFakeJob(11326, priority=5463)
+    try:
+      jqueue._EvaluateJobProcessorResult(depmgr, job,
+                                         jqueue._JobProcessor.DEFER)
+    except workerpool.DeferTask, err:
+      self.assertEqual(err.priority, 5463)
+    else:
+      self.fail("Didn't raise exception")
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+  def testWaitdep(self):
+    depmgr = _FakeDependencyManager()
+    job = _IdOnlyFakeJob(21317)
+    jqueue._EvaluateJobProcessorResult(depmgr, job,
+                                       jqueue._JobProcessor.WAITDEP)
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+  def testOther(self):
+    depmgr = _FakeDependencyManager()
+    job = _IdOnlyFakeJob(5813)
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._EvaluateJobProcessorResult,
+                      depmgr, job, "Other result")
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+
 class _FakeTimeoutStrategy:
   def __init__(self, timeouts):
     self.timeouts = timeouts