Revision 5fd6b694 test/ganeti.jqueue_unittest.py
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
521 | 521 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
522 | 522 |
self.assertFalse(queue.IsAcquired()) |
523 | 523 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
524 |
self.assertFalse(job.cur_opctx) |
|
524 | 525 |
|
525 | 526 |
def _AfterStart(op, cbs): |
526 | 527 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
... | ... | |
528 | 529 |
|
529 | 530 |
self.assertFalse(queue.IsAcquired()) |
530 | 531 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) |
532 |
self.assertFalse(job.cur_opctx) |
|
531 | 533 |
|
532 | 534 |
# Job is running, cancelling shouldn't be possible |
533 | 535 |
(success, _) = job.Cancel() |
... | ... | |
1049 | 1051 |
self.retries = 0 |
1050 | 1052 |
self.prev_tsop = None |
1051 | 1053 |
self.prev_prio = None |
1054 |
self.prev_status = None |
|
1055 |
self.lock_acq_prio = None |
|
1052 | 1056 |
self.gave_lock = None |
1053 | 1057 |
self.done_lock_before_blocking = False |
1054 | 1058 |
|
1055 | 1059 |
def _BeforeStart(self, timeout, priority): |
1056 | 1060 |
job = self.job |
1057 | 1061 |
|
1058 |
self.assertEqual(self.queue.GetNextUpdate(), (job, True)) |
|
1062 |
# If status has changed, job must've been written |
|
1063 |
if self.prev_status != self.job.ops[self.curop].status: |
|
1064 |
self.assertEqual(self.queue.GetNextUpdate(), (job, True)) |
|
1059 | 1065 |
self.assertRaises(IndexError, self.queue.GetNextUpdate) |
1066 |
|
|
1060 | 1067 |
self.assertFalse(self.queue.IsAcquired()) |
1061 | 1068 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
1062 | 1069 |
|
... | ... | |
1067 | 1074 |
self.assertEqual(priority, job.ops[self.curop].priority) |
1068 | 1075 |
|
1069 | 1076 |
self.gave_lock = True |
1077 |
self.lock_acq_prio = priority |
|
1070 | 1078 |
|
1071 | 1079 |
if (self.curop == 3 and |
1072 | 1080 |
job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3): |
... | ... | |
1090 | 1098 |
def _AfterStart(self, op, cbs): |
1091 | 1099 |
job = self.job |
1092 | 1100 |
|
1101 |
# Setting to "running" requires an update |
|
1093 | 1102 |
self.assertEqual(self.queue.GetNextUpdate(), (job, True)) |
1094 | 1103 |
self.assertRaises(IndexError, self.queue.GetNextUpdate) |
1104 |
|
|
1095 | 1105 |
self.assertFalse(self.queue.IsAcquired()) |
1096 | 1106 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) |
1097 | 1107 |
|
... | ... | |
1102 | 1112 |
def _NextOpcode(self): |
1103 | 1113 |
self.curop = self.opcounter.next() |
1104 | 1114 |
self.prev_prio = self.job.ops[self.curop].priority |
1115 |
self.prev_status = self.job.ops[self.curop].status |
|
1105 | 1116 |
|
1106 | 1117 |
def _NewTimeoutStrategy(self): |
1107 | 1118 |
job = self.job |
... | ... | |
1173 | 1184 |
|
1174 | 1185 |
self.assertFalse(self.done_lock_before_blocking) |
1175 | 1186 |
|
1176 |
for i in itertools.count(0):
|
|
1187 |
while True:
|
|
1177 | 1188 |
proc = jqueue._JobProcessor(self.queue, opexec, job, |
1178 | 1189 |
_timeout_strategy_factory=tsf) |
1179 | 1190 |
|
1180 | 1191 |
self.assertRaises(IndexError, self.queue.GetNextUpdate) |
1192 |
|
|
1193 |
if self.curop is not None: |
|
1194 |
self.prev_status = self.job.ops[self.curop].status |
|
1195 |
|
|
1196 |
self.lock_acq_prio = None |
|
1197 |
|
|
1181 | 1198 |
result = proc(_nextop_fn=self._NextOpcode) |
1182 |
self.assertEqual(self.queue.GetNextUpdate(), (job, True)) |
|
1183 |
self.assertRaises(IndexError, self.queue.GetNextUpdate) |
|
1199 |
assert self.curop is not None |
|
1200 |
|
|
1201 |
if result or self.gave_lock: |
|
1202 |
# Got lock and/or job is done, result must've been written |
|
1203 |
self.assertFalse(job.cur_opctx) |
|
1204 |
self.assertEqual(self.queue.GetNextUpdate(), (job, True)) |
|
1205 |
self.assertRaises(IndexError, self.queue.GetNextUpdate) |
|
1206 |
self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority) |
|
1207 |
self.assert_(job.ops[self.curop].exec_timestamp) |
|
1208 |
|
|
1184 | 1209 |
if result: |
1185 | 1210 |
self.assertFalse(job.cur_opctx) |
1186 | 1211 |
break |
1187 | 1212 |
|
1188 | 1213 |
self.assertFalse(result) |
1189 | 1214 |
|
1215 |
if self.curop == 0: |
|
1216 |
self.assertEqual(job.ops[self.curop].start_timestamp, |
|
1217 |
job.start_timestamp) |
|
1218 |
|
|
1190 | 1219 |
if self.gave_lock: |
1191 |
self.assertFalse(job.cur_opctx) |
|
1220 |
# Opcode finished, but job not yet done |
|
1221 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1192 | 1222 |
else: |
1223 |
# Did not get locks |
|
1193 | 1224 |
self.assert_(job.cur_opctx) |
1194 | 1225 |
self.assertEqual(job.cur_opctx._timeout_strategy._fn, |
1195 | 1226 |
self.timeout_strategy.NextAttempt) |
1227 |
self.assertFalse(job.ops[self.curop].exec_timestamp) |
|
1228 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
|
1229 |
|
|
1230 |
# If priority has changed since acquiring locks, the job must've been |
|
1231 |
# updated |
|
1232 |
if self.lock_acq_prio != job.ops[self.curop].priority: |
|
1233 |
self.assertEqual(self.queue.GetNextUpdate(), (job, True)) |
|
1234 |
|
|
1235 |
self.assertRaises(IndexError, self.queue.GetNextUpdate) |
|
1196 | 1236 |
|
1197 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1198 | 1237 |
self.assert_(job.start_timestamp) |
1199 | 1238 |
self.assertFalse(job.end_timestamp) |
1200 | 1239 |
|
Also available in: Unified diff