queue.release()
+def _EvaluateJobProcessorResult(depmgr, job, result):
+ """Looks at a result from L{_JobProcessor} for a job.
+
+ To be used in a L{_JobQueueWorker}.
+
+ """
+ if result == _JobProcessor.FINISHED:
+ # Notify waiting jobs
+ depmgr.NotifyWaiters(job.id)
+
+ elif result == _JobProcessor.DEFER:
+ # Schedule again
+ raise workerpool.DeferTask(priority=job.CalcPriority())
+
+ elif result == _JobProcessor.WAITDEP:
+ # No-op, dependency manager will re-schedule
+ pass
+
+ else:
+ raise errors.ProgrammerError("Job processor returned unknown status %s" %
+ (result, ))
+
+
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
proc.ExecOpCode)
- result = _JobProcessor(queue, wrap_execop_fn, job)()
-
- if result == _JobProcessor.FINISHED:
- # Notify waiting jobs
- queue.depmgr.NotifyWaiters(job.id)
-
- elif result == _JobProcessor.DEFER:
- # Schedule again
- raise workerpool.DeferTask(priority=job.CalcPriority())
-
- elif result == _JobProcessor.WAITDEP:
- # No-op, dependency manager will re-schedule
- pass
-
- else:
- raise errors.ProgrammerError("Job processor returned unknown status %s" %
- (result, ))
+ _EvaluateJobProcessorResult(queue.depmgr, job,
+ _JobProcessor(queue, wrap_execop_fn, job)())
@staticmethod
def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
from ganeti import compat
from ganeti import mcpu
from ganeti import query
+from ganeti import workerpool
import testutils
self.assertRaises(IndexError, queue.GetNextUpdate)
+class TestEvaluateJobProcessorResult(unittest.TestCase):
+ def testFinished(self):
+ depmgr = _FakeDependencyManager()
+ job = _IdOnlyFakeJob(30953)
+ jqueue._EvaluateJobProcessorResult(depmgr, job,
+ jqueue._JobProcessor.FINISHED)
+ self.assertEqual(depmgr.GetNextNotification(), job.id)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ def testDefer(self):
+ depmgr = _FakeDependencyManager()
+ job = _IdOnlyFakeJob(11326, priority=5463)
+ try:
+ jqueue._EvaluateJobProcessorResult(depmgr, job,
+ jqueue._JobProcessor.DEFER)
+ except workerpool.DeferTask, err:
+ self.assertEqual(err.priority, 5463)
+ else:
+ self.fail("Didn't raise exception")
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ def testWaitdep(self):
+ depmgr = _FakeDependencyManager()
+ job = _IdOnlyFakeJob(21317)
+ jqueue._EvaluateJobProcessorResult(depmgr, job,
+ jqueue._JobProcessor.WAITDEP)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ def testOther(self):
+ depmgr = _FakeDependencyManager()
+ job = _IdOnlyFakeJob(5813)
+ self.assertRaises(errors.ProgrammerError,
+ jqueue._EvaluateJobProcessorResult,
+ depmgr, job, "Other result")
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+
class _FakeTimeoutStrategy:
def __init__(self, timeouts):
self.timeouts = timeouts