Revision df5a5730
b/lib/jqueue.py | ||
---|---|---|
1237 | 1237 |
queue.release() |
1238 | 1238 |
|
1239 | 1239 |
|
1240 |
def _EvaluateJobProcessorResult(depmgr, job, result): |
|
1241 |
"""Looks at a result from L{_JobProcessor} for a job. |
|
1242 |
|
|
1243 |
To be used in a L{_JobQueueWorker}. |
|
1244 |
|
|
1245 |
""" |
|
1246 |
if result == _JobProcessor.FINISHED: |
|
1247 |
# Notify waiting jobs |
|
1248 |
depmgr.NotifyWaiters(job.id) |
|
1249 |
|
|
1250 |
elif result == _JobProcessor.DEFER: |
|
1251 |
# Schedule again |
|
1252 |
raise workerpool.DeferTask(priority=job.CalcPriority()) |
|
1253 |
|
|
1254 |
elif result == _JobProcessor.WAITDEP: |
|
1255 |
# No-op, dependency manager will re-schedule |
|
1256 |
pass |
|
1257 |
|
|
1258 |
else: |
|
1259 |
raise errors.ProgrammerError("Job processor returned unknown status %s" % |
|
1260 |
(result, )) |
|
1261 |
|
|
1262 |
|
|
1240 | 1263 |
class _JobQueueWorker(workerpool.BaseWorker): |
1241 | 1264 |
"""The actual job workers. |
1242 | 1265 |
|
... | ... | |
1277 | 1300 |
wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, |
1278 | 1301 |
proc.ExecOpCode) |
1279 | 1302 |
|
1280 |
result = _JobProcessor(queue, wrap_execop_fn, job)() |
|
1281 |
|
|
1282 |
if result == _JobProcessor.FINISHED: |
|
1283 |
# Notify waiting jobs |
|
1284 |
queue.depmgr.NotifyWaiters(job.id) |
|
1285 |
|
|
1286 |
elif result == _JobProcessor.DEFER: |
|
1287 |
# Schedule again |
|
1288 |
raise workerpool.DeferTask(priority=job.CalcPriority()) |
|
1289 |
|
|
1290 |
elif result == _JobProcessor.WAITDEP: |
|
1291 |
# No-op, dependency manager will re-schedule |
|
1292 |
pass |
|
1293 |
|
|
1294 |
else: |
|
1295 |
raise errors.ProgrammerError("Job processor returned unknown status %s" % |
|
1296 |
(result, )) |
|
1303 |
_EvaluateJobProcessorResult(queue.depmgr, job, |
|
1304 |
_JobProcessor(queue, wrap_execop_fn, job)()) |
|
1297 | 1305 |
|
1298 | 1306 |
@staticmethod |
1299 | 1307 |
def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs): |
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
38 | 38 |
from ganeti import compat |
39 | 39 |
from ganeti import mcpu |
40 | 40 |
from ganeti import query |
41 |
from ganeti import workerpool |
|
41 | 42 |
|
42 | 43 |
import testutils |
43 | 44 |
|
... | ... | |
1625 | 1626 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
1626 | 1627 |
|
1627 | 1628 |
|
1629 |
class TestEvaluateJobProcessorResult(unittest.TestCase): |
|
1630 |
def testFinished(self): |
|
1631 |
depmgr = _FakeDependencyManager() |
|
1632 |
job = _IdOnlyFakeJob(30953) |
|
1633 |
jqueue._EvaluateJobProcessorResult(depmgr, job, |
|
1634 |
jqueue._JobProcessor.FINISHED) |
|
1635 |
self.assertEqual(depmgr.GetNextNotification(), job.id) |
|
1636 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1637 |
|
|
1638 |
def testDefer(self): |
|
1639 |
depmgr = _FakeDependencyManager() |
|
1640 |
job = _IdOnlyFakeJob(11326, priority=5463) |
|
1641 |
try: |
|
1642 |
jqueue._EvaluateJobProcessorResult(depmgr, job, |
|
1643 |
jqueue._JobProcessor.DEFER) |
|
1644 |
except workerpool.DeferTask, err: |
|
1645 |
self.assertEqual(err.priority, 5463) |
|
1646 |
else: |
|
1647 |
self.fail("Didn't raise exception") |
|
1648 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1649 |
|
|
1650 |
def testWaitdep(self): |
|
1651 |
depmgr = _FakeDependencyManager() |
|
1652 |
job = _IdOnlyFakeJob(21317) |
|
1653 |
jqueue._EvaluateJobProcessorResult(depmgr, job, |
|
1654 |
jqueue._JobProcessor.WAITDEP) |
|
1655 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1656 |
|
|
1657 |
def testOther(self): |
|
1658 |
depmgr = _FakeDependencyManager() |
|
1659 |
job = _IdOnlyFakeJob(5813) |
|
1660 |
self.assertRaises(errors.ProgrammerError, |
|
1661 |
jqueue._EvaluateJobProcessorResult, |
|
1662 |
depmgr, job, "Other result") |
|
1663 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1664 |
|
|
1665 |
|
|
1628 | 1666 |
class _FakeTimeoutStrategy: |
1629 | 1667 |
def __init__(self, timeouts): |
1630 | 1668 |
self.timeouts = timeouts |
Also available in: Unified diff