Revision 26d3fd2f test/ganeti.jqueue_unittest.py
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
27 | 27 |
import tempfile |
28 | 28 |
import shutil |
29 | 29 |
import errno |
30 |
import itertools |
|
30 | 31 |
|
31 | 32 |
from ganeti import constants |
32 | 33 |
from ganeti import utils |
... | ... | |
34 | 35 |
from ganeti import jqueue |
35 | 36 |
from ganeti import opcodes |
36 | 37 |
from ganeti import compat |
38 |
from ganeti import mcpu |
|
37 | 39 |
|
38 | 40 |
import testutils |
39 | 41 |
|
... | ... | |
447 | 449 |
self._before_start = before_start |
448 | 450 |
self._after_start = after_start |
449 | 451 |
|
450 |
def __call__(self, op, cbs): |
|
452 |
def __call__(self, op, cbs, timeout=None):
|
|
451 | 453 |
assert isinstance(op, opcodes.OpTestDummy) |
452 | 454 |
|
453 | 455 |
if self._before_start: |
454 |
self._before_start() |
|
456 |
self._before_start(timeout)
|
|
455 | 457 |
|
456 | 458 |
cbs.NotifyStart() |
457 | 459 |
|
... | ... | |
464 | 466 |
return op.result |
465 | 467 |
|
466 | 468 |
|
467 |
class TestJobProcessor(unittest.TestCase):
|
|
469 |
class _JobProcessorTestUtils:
|
|
468 | 470 |
def _CreateJob(self, queue, job_id, ops): |
469 | 471 |
job = jqueue._QueuedJob(queue, job_id, ops) |
470 | 472 |
self.assertFalse(job.start_timestamp) |
... | ... | |
475 | 477 |
self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]]) |
476 | 478 |
return job |
477 | 479 |
|
480 |
|
|
481 |
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): |
|
478 | 482 |
def _GenericCheckJob(self, job): |
479 | 483 |
assert compat.all(isinstance(op.input, opcodes.OpTestDummy) |
480 | 484 |
for op in job.ops) |
... | ... | |
502 | 506 |
# Create job |
503 | 507 |
job = self._CreateJob(queue, job_id, ops) |
504 | 508 |
|
505 |
def _BeforeStart(): |
|
509 |
def _BeforeStart(_):
|
|
506 | 510 |
self.assertFalse(queue.IsAcquired()) |
507 | 511 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
508 | 512 |
|
... | ... | |
656 | 660 |
|
657 | 661 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
658 | 662 |
|
659 |
def _BeforeStart(): |
|
663 |
def _BeforeStart(_):
|
|
660 | 664 |
self.assertFalse(queue.IsAcquired()) |
661 | 665 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
662 | 666 |
|
... | ... | |
845 | 849 |
# Create job |
846 | 850 |
job = self._CreateJob(queue, 29386, ops) |
847 | 851 |
|
848 |
def _BeforeStart(): |
|
852 |
def _BeforeStart(_):
|
|
849 | 853 |
self.assertFalse(queue.IsAcquired()) |
850 | 854 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
851 | 855 |
|
... | ... | |
922 | 926 |
self.assertFalse(job.GetLogEntries(count + 3)) |
923 | 927 |
|
924 | 928 |
|
929 |
class _FakeTimeoutStrategy: |
|
930 |
def __init__(self, timeouts): |
|
931 |
self.timeouts = timeouts |
|
932 |
self.attempts = 0 |
|
933 |
self.last_timeout = None |
|
934 |
|
|
935 |
def NextAttempt(self): |
|
936 |
self.attempts += 1 |
|
937 |
if self.timeouts: |
|
938 |
timeout = self.timeouts.pop(0) |
|
939 |
else: |
|
940 |
timeout = None |
|
941 |
self.last_timeout = timeout |
|
942 |
return timeout |
|
943 |
|
|
944 |
|
|
945 |
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): |
|
946 |
def setUp(self): |
|
947 |
self.queue = _FakeQueueForProc() |
|
948 |
self.job = None |
|
949 |
self.curop = None |
|
950 |
self.opcounter = None |
|
951 |
self.timeout_strategy = None |
|
952 |
self.retries = 0 |
|
953 |
self.prev_tsop = None |
|
954 |
self.prev_prio = None |
|
955 |
self.gave_lock = None |
|
956 |
self.done_lock_before_blocking = False |
|
957 |
|
|
958 |
def _BeforeStart(self, timeout): |
|
959 |
job = self.job |
|
960 |
|
|
961 |
self.assertFalse(self.queue.IsAcquired()) |
|
962 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
|
963 |
|
|
964 |
ts = self.timeout_strategy |
|
965 |
|
|
966 |
self.assert_(timeout is None or isinstance(timeout, (int, float))) |
|
967 |
self.assertEqual(timeout, ts.last_timeout) |
|
968 |
|
|
969 |
self.gave_lock = True |
|
970 |
|
|
971 |
if (self.curop == 3 and |
|
972 |
job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3): |
|
973 |
# Give locks before running into blocking acquire |
|
974 |
assert self.retries == 7 |
|
975 |
self.retries = 0 |
|
976 |
self.done_lock_before_blocking = True |
|
977 |
return |
|
978 |
|
|
979 |
if self.retries > 0: |
|
980 |
self.assert_(timeout is not None) |
|
981 |
self.retries -= 1 |
|
982 |
self.gave_lock = False |
|
983 |
raise mcpu.LockAcquireTimeout() |
|
984 |
|
|
985 |
if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST: |
|
986 |
assert self.retries == 0, "Didn't exhaust all retries at highest priority" |
|
987 |
assert not ts.timeouts |
|
988 |
self.assert_(timeout is None) |
|
989 |
|
|
990 |
def _AfterStart(self, op, cbs): |
|
991 |
job = self.job |
|
992 |
|
|
993 |
self.assertFalse(self.queue.IsAcquired()) |
|
994 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) |
|
995 |
|
|
996 |
# Job is running, cancelling shouldn't be possible |
|
997 |
(success, _) = job.Cancel() |
|
998 |
self.assertFalse(success) |
|
999 |
|
|
1000 |
def _NextOpcode(self): |
|
1001 |
self.curop = self.opcounter.next() |
|
1002 |
self.prev_prio = self.job.ops[self.curop].priority |
|
1003 |
|
|
1004 |
def _NewTimeoutStrategy(self): |
|
1005 |
job = self.job |
|
1006 |
|
|
1007 |
self.assertEqual(self.retries, 0) |
|
1008 |
|
|
1009 |
if self.prev_tsop == self.curop: |
|
1010 |
# Still on the same opcode, priority must've been increased |
|
1011 |
self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1) |
|
1012 |
|
|
1013 |
if self.curop == 1: |
|
1014 |
# Normal retry |
|
1015 |
timeouts = range(10, 31, 10) |
|
1016 |
self.retries = len(timeouts) - 1 |
|
1017 |
|
|
1018 |
elif self.curop == 2: |
|
1019 |
# Let this run into a blocking acquire |
|
1020 |
timeouts = range(11, 61, 12) |
|
1021 |
self.retries = len(timeouts) |
|
1022 |
|
|
1023 |
elif self.curop == 3: |
|
1024 |
# Wait for priority to increase, but give lock before blocking acquire |
|
1025 |
timeouts = range(12, 100, 14) |
|
1026 |
self.retries = len(timeouts) |
|
1027 |
|
|
1028 |
self.assertFalse(self.done_lock_before_blocking) |
|
1029 |
|
|
1030 |
elif self.curop == 4: |
|
1031 |
self.assert_(self.done_lock_before_blocking) |
|
1032 |
|
|
1033 |
# Timeouts, but no need to retry |
|
1034 |
timeouts = range(10, 31, 10) |
|
1035 |
self.retries = 0 |
|
1036 |
|
|
1037 |
elif self.curop == 5: |
|
1038 |
# Normal retry |
|
1039 |
timeouts = range(19, 100, 11) |
|
1040 |
self.retries = len(timeouts) |
|
1041 |
|
|
1042 |
else: |
|
1043 |
timeouts = [] |
|
1044 |
self.retries = 0 |
|
1045 |
|
|
1046 |
assert len(job.ops) == 10 |
|
1047 |
assert self.retries <= len(timeouts) |
|
1048 |
|
|
1049 |
ts = _FakeTimeoutStrategy(timeouts) |
|
1050 |
|
|
1051 |
self.timeout_strategy = ts |
|
1052 |
self.prev_tsop = self.curop |
|
1053 |
self.prev_prio = job.ops[self.curop].priority |
|
1054 |
|
|
1055 |
return ts |
|
1056 |
|
|
1057 |
def testTimeout(self): |
|
1058 |
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) |
|
1059 |
for i in range(10)] |
|
1060 |
|
|
1061 |
# Create job |
|
1062 |
job_id = 15801 |
|
1063 |
job = self._CreateJob(self.queue, job_id, ops) |
|
1064 |
self.job = job |
|
1065 |
|
|
1066 |
self.opcounter = itertools.count(0) |
|
1067 |
|
|
1068 |
opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart) |
|
1069 |
tsf = self._NewTimeoutStrategy |
|
1070 |
|
|
1071 |
self.assertFalse(self.done_lock_before_blocking) |
|
1072 |
|
|
1073 |
for i in itertools.count(0): |
|
1074 |
proc = jqueue._JobProcessor(self.queue, opexec, job, |
|
1075 |
_timeout_strategy_factory=tsf) |
|
1076 |
|
|
1077 |
result = proc(_nextop_fn=self._NextOpcode) |
|
1078 |
if result: |
|
1079 |
self.assertFalse(job.cur_opctx) |
|
1080 |
break |
|
1081 |
|
|
1082 |
self.assertFalse(result) |
|
1083 |
|
|
1084 |
if self.gave_lock: |
|
1085 |
self.assertFalse(job.cur_opctx) |
|
1086 |
else: |
|
1087 |
self.assert_(job.cur_opctx) |
|
1088 |
self.assertEqual(job.cur_opctx._timeout_strategy._fn, |
|
1089 |
self.timeout_strategy.NextAttempt) |
|
1090 |
|
|
1091 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1092 |
self.assert_(job.start_timestamp) |
|
1093 |
self.assertFalse(job.end_timestamp) |
|
1094 |
|
|
1095 |
self.assertEqual(self.curop, len(job.ops) - 1) |
|
1096 |
self.assertEqual(self.job, job) |
|
1097 |
self.assertEqual(self.opcounter.next(), len(job.ops)) |
|
1098 |
self.assert_(self.done_lock_before_blocking) |
|
1099 |
|
|
1100 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) |
|
1101 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) |
|
1102 |
self.assertEqual(job.GetInfo(["opresult"]), |
|
1103 |
[[op.input.result for op in job.ops]]) |
|
1104 |
self.assertEqual(job.GetInfo(["opstatus"]), |
|
1105 |
[len(job.ops) * [constants.OP_STATUS_SUCCESS]]) |
|
1106 |
self.assert_(compat.all(op.start_timestamp and op.end_timestamp |
|
1107 |
for op in job.ops)) |
|
1108 |
|
|
1109 |
# Finished jobs can't be processed any further |
|
1110 |
self.assertRaises(errors.ProgrammerError, |
|
1111 |
jqueue._JobProcessor(self.queue, opexec, job)) |
|
1112 |
|
|
1113 |
|
|
925 | 1114 |
if __name__ == "__main__": |
926 | 1115 |
testutils.GanetiTestProgram() |
Also available in: Unified diff