Revision b95479a5 test/ganeti.jqueue_unittest.py
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
424 | 424 |
self.assertEqual(job.CalcStatus(), status) |
425 | 425 |
|
426 | 426 |
|
427 |
class _FakeQueueForProc:
|
|
427 |
class _FakeDependencyManager:
|
|
428 | 428 |
def __init__(self): |
429 |
self._checks = [] |
|
430 |
self._notifications = [] |
|
431 |
self._waiting = set() |
|
432 |
|
|
433 |
def AddCheckResult(self, job, dep_job_id, dep_status, result): |
|
434 |
self._checks.append((job, dep_job_id, dep_status, result)) |
|
435 |
|
|
436 |
def CountPendingResults(self): |
|
437 |
return len(self._checks) |
|
438 |
|
|
439 |
def CountWaitingJobs(self): |
|
440 |
return len(self._waiting) |
|
441 |
|
|
442 |
def GetNextNotification(self): |
|
443 |
return self._notifications.pop(0) |
|
444 |
|
|
445 |
def JobWaiting(self, job): |
|
446 |
return job in self._waiting |
|
447 |
|
|
448 |
def CheckAndRegister(self, job, dep_job_id, dep_status): |
|
449 |
(exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0) |
|
450 |
|
|
451 |
assert exp_job == job |
|
452 |
assert exp_dep_job_id == dep_job_id |
|
453 |
assert exp_dep_status == dep_status |
|
454 |
|
|
455 |
(result_status, _) = result |
|
456 |
|
|
457 |
if result_status == jqueue._JobDependencyManager.WAIT: |
|
458 |
self._waiting.add(job) |
|
459 |
elif result_status == jqueue._JobDependencyManager.CONTINUE: |
|
460 |
self._waiting.remove(job) |
|
461 |
|
|
462 |
return result |
|
463 |
|
|
464 |
def NotifyWaiters(self, job_id): |
|
465 |
self._notifications.append(job_id) |
|
466 |
|
|
467 |
|
|
468 |
class _DisabledFakeDependencyManager: |
|
469 |
def JobWaiting(self, _): |
|
470 |
return False |
|
471 |
|
|
472 |
def CheckAndRegister(self, *args): |
|
473 |
assert False, "Should not be called" |
|
474 |
|
|
475 |
def NotifyWaiters(self, _): |
|
476 |
pass |
|
477 |
|
|
478 |
|
|
479 |
class _FakeQueueForProc: |
|
480 |
def __init__(self, depmgr=None): |
|
429 | 481 |
self._acquired = False |
430 | 482 |
self._updates = [] |
431 | 483 |
self._submitted = [] |
432 | 484 |
|
433 | 485 |
self._submit_count = itertools.count(1000) |
434 | 486 |
|
487 |
if depmgr: |
|
488 |
self.depmgr = depmgr |
|
489 |
else: |
|
490 |
self.depmgr = _DisabledFakeDependencyManager() |
|
491 |
|
|
435 | 492 |
def IsAcquired(self): |
436 | 493 |
return self._acquired |
437 | 494 |
|
... | ... | |
960 | 1017 |
|
961 | 1018 |
# ... also after being restored |
962 | 1019 |
job2 = jqueue._QueuedJob.Restore(queue, job.Serialize()) |
1020 |
# Calling the processor on a finished job should be a no-op |
|
963 | 1021 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)()) |
964 | 1022 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
965 | 1023 |
|
... | ... | |
1179 | 1237 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) |
1180 | 1238 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
1181 | 1239 |
|
1240 |
def testJobDependency(self): |
|
1241 |
depmgr = _FakeDependencyManager() |
|
1242 |
queue = _FakeQueueForProc(depmgr=depmgr) |
|
1243 |
|
|
1244 |
self.assertEqual(queue.depmgr, depmgr) |
|
1245 |
|
|
1246 |
prev_job_id = 22113 |
|
1247 |
prev_job_id2 = 28102 |
|
1248 |
job_id = 29929 |
|
1249 |
ops = [ |
|
1250 |
opcodes.OpTestDummy(result="Res0", fail=False, |
|
1251 |
depends=[ |
|
1252 |
[prev_job_id2, None], |
|
1253 |
[prev_job_id, None], |
|
1254 |
]), |
|
1255 |
opcodes.OpTestDummy(result="Res1", fail=False), |
|
1256 |
] |
|
1257 |
|
|
1258 |
# Create job |
|
1259 |
job = self._CreateJob(queue, job_id, ops) |
|
1260 |
|
|
1261 |
def _BeforeStart(timeout, priority): |
|
1262 |
if attempt == 0 or attempt > 5: |
|
1263 |
# Job should only be updated when it wasn't waiting for another job |
|
1264 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1265 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1266 |
self.assertFalse(queue.IsAcquired()) |
|
1267 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
|
1268 |
self.assertFalse(job.cur_opctx) |
|
1269 |
|
|
1270 |
def _AfterStart(op, cbs): |
|
1271 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1272 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1273 |
|
|
1274 |
self.assertFalse(queue.IsAcquired()) |
|
1275 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) |
|
1276 |
self.assertFalse(job.cur_opctx) |
|
1277 |
|
|
1278 |
# Job is running, cancelling shouldn't be possible |
|
1279 |
(success, _) = job.Cancel() |
|
1280 |
self.assertFalse(success) |
|
1281 |
|
|
1282 |
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) |
|
1283 |
|
|
1284 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1285 |
|
|
1286 |
counter = itertools.count() |
|
1287 |
while True: |
|
1288 |
attempt = counter.next() |
|
1289 |
|
|
1290 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1291 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1292 |
|
|
1293 |
if attempt < 2: |
|
1294 |
depmgr.AddCheckResult(job, prev_job_id2, None, |
|
1295 |
(jqueue._JobDependencyManager.WAIT, "wait2")) |
|
1296 |
elif attempt == 2: |
|
1297 |
depmgr.AddCheckResult(job, prev_job_id2, None, |
|
1298 |
(jqueue._JobDependencyManager.CONTINUE, "cont")) |
|
1299 |
# The processor will ask for the next dependency immediately |
|
1300 |
depmgr.AddCheckResult(job, prev_job_id, None, |
|
1301 |
(jqueue._JobDependencyManager.WAIT, "wait")) |
|
1302 |
elif attempt < 5: |
|
1303 |
depmgr.AddCheckResult(job, prev_job_id, None, |
|
1304 |
(jqueue._JobDependencyManager.WAIT, "wait")) |
|
1305 |
elif attempt == 5: |
|
1306 |
depmgr.AddCheckResult(job, prev_job_id, None, |
|
1307 |
(jqueue._JobDependencyManager.CONTINUE, "cont")) |
|
1308 |
if attempt == 2: |
|
1309 |
self.assertEqual(depmgr.CountPendingResults(), 2) |
|
1310 |
elif attempt > 5: |
|
1311 |
self.assertEqual(depmgr.CountPendingResults(), 0) |
|
1312 |
else: |
|
1313 |
self.assertEqual(depmgr.CountPendingResults(), 1) |
|
1314 |
|
|
1315 |
result = jqueue._JobProcessor(queue, opexec, job)() |
|
1316 |
if attempt == 0 or attempt >= 5: |
|
1317 |
# Job should only be updated if there was an actual change |
|
1318 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1319 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1320 |
self.assertFalse(depmgr.CountPendingResults()) |
|
1321 |
|
|
1322 |
if attempt < 5: |
|
1323 |
# Simulate waiting for other job |
|
1324 |
self.assertTrue(result) |
|
1325 |
self.assertTrue(job.cur_opctx) |
|
1326 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
|
1327 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1328 |
self.assert_(job.start_timestamp) |
|
1329 |
self.assertFalse(job.end_timestamp) |
|
1330 |
continue |
|
1331 |
|
|
1332 |
if result: |
|
1333 |
# Last opcode |
|
1334 |
self.assertFalse(job.cur_opctx) |
|
1335 |
self.assertEqual(queue.depmgr.GetNextNotification(), job_id) |
|
1336 |
break |
|
1337 |
|
|
1338 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1339 |
|
|
1340 |
self.assertFalse(result) |
|
1341 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1342 |
self.assert_(job.start_timestamp) |
|
1343 |
self.assertFalse(job.end_timestamp) |
|
1344 |
|
|
1345 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) |
|
1346 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) |
|
1347 |
self.assertEqual(job.GetInfo(["opresult"]), |
|
1348 |
[[op.input.result for op in job.ops]]) |
|
1349 |
self.assertEqual(job.GetInfo(["opstatus"]), |
|
1350 |
[len(job.ops) * [constants.OP_STATUS_SUCCESS]]) |
|
1351 |
self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp |
|
1352 |
for op in job.ops)) |
|
1353 |
|
|
1354 |
self._GenericCheckJob(job) |
|
1355 |
|
|
1356 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1357 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1358 |
self.assertFalse(depmgr.CountPendingResults()) |
|
1359 |
self.assertFalse(depmgr.CountWaitingJobs()) |
|
1360 |
|
|
1361 |
# Calling the processor on a finished job should be a no-op |
|
1362 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) |
|
1363 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1364 |
|
|
1365 |
def testJobDependencyCancel(self): |
|
1366 |
depmgr = _FakeDependencyManager() |
|
1367 |
queue = _FakeQueueForProc(depmgr=depmgr) |
|
1368 |
|
|
1369 |
self.assertEqual(queue.depmgr, depmgr) |
|
1370 |
|
|
1371 |
prev_job_id = 13623 |
|
1372 |
job_id = 30876 |
|
1373 |
ops = [ |
|
1374 |
opcodes.OpTestDummy(result="Res0", fail=False), |
|
1375 |
opcodes.OpTestDummy(result="Res1", fail=False, |
|
1376 |
depends=[ |
|
1377 |
[prev_job_id, None], |
|
1378 |
]), |
|
1379 |
opcodes.OpTestDummy(result="Res2", fail=False), |
|
1380 |
] |
|
1381 |
|
|
1382 |
# Create job |
|
1383 |
job = self._CreateJob(queue, job_id, ops) |
|
1384 |
|
|
1385 |
def _BeforeStart(timeout, priority): |
|
1386 |
if attempt == 0 or attempt > 5: |
|
1387 |
# Job should only be updated when it wasn't waiting for another job |
|
1388 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1389 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1390 |
self.assertFalse(queue.IsAcquired()) |
|
1391 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
|
1392 |
self.assertFalse(job.cur_opctx) |
|
1393 |
|
|
1394 |
def _AfterStart(op, cbs): |
|
1395 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1396 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1397 |
|
|
1398 |
self.assertFalse(queue.IsAcquired()) |
|
1399 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) |
|
1400 |
self.assertFalse(job.cur_opctx) |
|
1401 |
|
|
1402 |
# Job is running, cancelling shouldn't be possible |
|
1403 |
(success, _) = job.Cancel() |
|
1404 |
self.assertFalse(success) |
|
1405 |
|
|
1406 |
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) |
|
1407 |
|
|
1408 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1409 |
|
|
1410 |
counter = itertools.count() |
|
1411 |
while True: |
|
1412 |
attempt = counter.next() |
|
1413 |
|
|
1414 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1415 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1416 |
|
|
1417 |
if attempt == 0: |
|
1418 |
# This will handle the first opcode |
|
1419 |
pass |
|
1420 |
elif attempt < 4: |
|
1421 |
depmgr.AddCheckResult(job, prev_job_id, None, |
|
1422 |
(jqueue._JobDependencyManager.WAIT, "wait")) |
|
1423 |
elif attempt == 4: |
|
1424 |
# Other job was cancelled |
|
1425 |
depmgr.AddCheckResult(job, prev_job_id, None, |
|
1426 |
(jqueue._JobDependencyManager.CANCEL, "cancel")) |
|
1427 |
|
|
1428 |
if attempt == 0: |
|
1429 |
self.assertEqual(depmgr.CountPendingResults(), 0) |
|
1430 |
else: |
|
1431 |
self.assertEqual(depmgr.CountPendingResults(), 1) |
|
1432 |
|
|
1433 |
result = jqueue._JobProcessor(queue, opexec, job)() |
|
1434 |
if attempt <= 1 or attempt >= 4: |
|
1435 |
# Job should only be updated if there was an actual change |
|
1436 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1437 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1438 |
self.assertFalse(depmgr.CountPendingResults()) |
|
1439 |
|
|
1440 |
if attempt > 0 and attempt < 4: |
|
1441 |
# Simulate waiting for other job |
|
1442 |
self.assertTrue(result) |
|
1443 |
self.assertTrue(job.cur_opctx) |
|
1444 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
|
1445 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1446 |
self.assert_(job.start_timestamp) |
|
1447 |
self.assertFalse(job.end_timestamp) |
|
1448 |
continue |
|
1449 |
|
|
1450 |
if result: |
|
1451 |
# Last opcode |
|
1452 |
self.assertFalse(job.cur_opctx) |
|
1453 |
self.assertEqual(queue.depmgr.GetNextNotification(), job_id) |
|
1454 |
break |
|
1455 |
|
|
1456 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1457 |
|
|
1458 |
self.assertFalse(result) |
|
1459 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1460 |
self.assert_(job.start_timestamp) |
|
1461 |
self.assertFalse(job.end_timestamp) |
|
1462 |
|
|
1463 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) |
|
1464 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED]) |
|
1465 |
self.assertEqual(job.GetInfo(["opstatus", "opresult"]), |
|
1466 |
[[constants.OP_STATUS_SUCCESS, |
|
1467 |
constants.OP_STATUS_CANCELED, |
|
1468 |
constants.OP_STATUS_CANCELED], |
|
1469 |
["Res0", "Job canceled by request", |
|
1470 |
"Job canceled by request"]]) |
|
1471 |
|
|
1472 |
self._GenericCheckJob(job) |
|
1473 |
|
|
1474 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1475 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1476 |
self.assertFalse(depmgr.CountPendingResults()) |
|
1477 |
|
|
1478 |
# Calling the processor on a finished job should be a no-op |
|
1479 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) |
|
1480 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1481 |
|
|
1482 |
def testJobDependencyWrongstatus(self): |
|
1483 |
depmgr = _FakeDependencyManager() |
|
1484 |
queue = _FakeQueueForProc(depmgr=depmgr) |
|
1485 |
|
|
1486 |
self.assertEqual(queue.depmgr, depmgr) |
|
1487 |
|
|
1488 |
prev_job_id = 9741 |
|
1489 |
job_id = 11763 |
|
1490 |
ops = [ |
|
1491 |
opcodes.OpTestDummy(result="Res0", fail=False), |
|
1492 |
opcodes.OpTestDummy(result="Res1", fail=False, |
|
1493 |
depends=[ |
|
1494 |
[prev_job_id, None], |
|
1495 |
]), |
|
1496 |
opcodes.OpTestDummy(result="Res2", fail=False), |
|
1497 |
] |
|
1498 |
|
|
1499 |
# Create job |
|
1500 |
job = self._CreateJob(queue, job_id, ops) |
|
1501 |
|
|
1502 |
def _BeforeStart(timeout, priority): |
|
1503 |
if attempt == 0 or attempt > 5: |
|
1504 |
# Job should only be updated when it wasn't waiting for another job |
|
1505 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1506 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1507 |
self.assertFalse(queue.IsAcquired()) |
|
1508 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
|
1509 |
self.assertFalse(job.cur_opctx) |
|
1510 |
|
|
1511 |
def _AfterStart(op, cbs): |
|
1512 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1513 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1514 |
|
|
1515 |
self.assertFalse(queue.IsAcquired()) |
|
1516 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) |
|
1517 |
self.assertFalse(job.cur_opctx) |
|
1518 |
|
|
1519 |
# Job is running, cancelling shouldn't be possible |
|
1520 |
(success, _) = job.Cancel() |
|
1521 |
self.assertFalse(success) |
|
1522 |
|
|
1523 |
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) |
|
1524 |
|
|
1525 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1526 |
|
|
1527 |
counter = itertools.count() |
|
1528 |
while True: |
|
1529 |
attempt = counter.next() |
|
1530 |
|
|
1531 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1532 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1533 |
|
|
1534 |
if attempt == 0: |
|
1535 |
# This will handle the first opcode |
|
1536 |
pass |
|
1537 |
elif attempt < 4: |
|
1538 |
depmgr.AddCheckResult(job, prev_job_id, None, |
|
1539 |
(jqueue._JobDependencyManager.WAIT, "wait")) |
|
1540 |
elif attempt == 4: |
|
1541 |
# Other job failed |
|
1542 |
depmgr.AddCheckResult(job, prev_job_id, None, |
|
1543 |
(jqueue._JobDependencyManager.WRONGSTATUS, "w")) |
|
1544 |
|
|
1545 |
if attempt == 0: |
|
1546 |
self.assertEqual(depmgr.CountPendingResults(), 0) |
|
1547 |
else: |
|
1548 |
self.assertEqual(depmgr.CountPendingResults(), 1) |
|
1549 |
|
|
1550 |
result = jqueue._JobProcessor(queue, opexec, job)() |
|
1551 |
if attempt <= 1 or attempt >= 4: |
|
1552 |
# Job should only be updated if there was an actual change |
|
1553 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1554 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1555 |
self.assertFalse(depmgr.CountPendingResults()) |
|
1556 |
|
|
1557 |
if attempt > 0 and attempt < 4: |
|
1558 |
# Simulate waiting for other job |
|
1559 |
self.assertTrue(result) |
|
1560 |
self.assertTrue(job.cur_opctx) |
|
1561 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
|
1562 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1563 |
self.assert_(job.start_timestamp) |
|
1564 |
self.assertFalse(job.end_timestamp) |
|
1565 |
continue |
|
1566 |
|
|
1567 |
if result: |
|
1568 |
# Last opcode |
|
1569 |
self.assertFalse(job.cur_opctx) |
|
1570 |
self.assertEqual(queue.depmgr.GetNextNotification(), job_id) |
|
1571 |
break |
|
1572 |
|
|
1573 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1574 |
|
|
1575 |
self.assertFalse(result) |
|
1576 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1577 |
self.assert_(job.start_timestamp) |
|
1578 |
self.assertFalse(job.end_timestamp) |
|
1579 |
|
|
1580 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR) |
|
1581 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR]) |
|
1582 |
self.assertEqual(job.GetInfo(["opstatus"]), |
|
1583 |
[[constants.OP_STATUS_SUCCESS, |
|
1584 |
constants.OP_STATUS_ERROR, |
|
1585 |
constants.OP_STATUS_ERROR]]), |
|
1586 |
|
|
1587 |
(opresult, ) = job.GetInfo(["opresult"]) |
|
1588 |
self.assertEqual(len(opresult), len(ops)) |
|
1589 |
self.assertEqual(opresult[0], "Res0") |
|
1590 |
self.assertTrue(errors.GetEncodedError(opresult[1])) |
|
1591 |
self.assertTrue(errors.GetEncodedError(opresult[2])) |
|
1592 |
|
|
1593 |
self._GenericCheckJob(job) |
|
1594 |
|
|
1595 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1596 |
self.assertRaises(IndexError, depmgr.GetNextNotification) |
|
1597 |
self.assertFalse(depmgr.CountPendingResults()) |
|
1598 |
|
|
1599 |
# Calling the processor on a finished job should be a no-op |
|
1600 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) |
|
1601 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1602 |
|
|
1182 | 1603 |
|
1183 | 1604 |
class _FakeTimeoutStrategy: |
1184 | 1605 |
def __init__(self, timeouts): |
... | ... | |
1412 | 1833 |
self.assertRaises(IndexError, self.queue.GetNextUpdate) |
1413 | 1834 |
|
1414 | 1835 |
|
1836 |
class TestJobDependencyManager(unittest.TestCase): |
|
1837 |
class _FakeJob: |
|
1838 |
def __init__(self, job_id): |
|
1839 |
self.id = str(job_id) |
|
1840 |
|
|
1841 |
def setUp(self): |
|
1842 |
self._status = [] |
|
1843 |
self._queue = [] |
|
1844 |
self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue) |
|
1845 |
|
|
1846 |
def _GetStatus(self, job_id): |
|
1847 |
(exp_job_id, result) = self._status.pop(0) |
|
1848 |
self.assertEqual(exp_job_id, job_id) |
|
1849 |
return result |
|
1850 |
|
|
1851 |
def _Enqueue(self, jobs): |
|
1852 |
self._queue.append(jobs) |
|
1853 |
|
|
1854 |
def testNotFinalizedThenCancel(self): |
|
1855 |
job = self._FakeJob(17697) |
|
1856 |
job_id = str(28625) |
|
1857 |
|
|
1858 |
self._status.append((job_id, constants.JOB_STATUS_RUNNING)) |
|
1859 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, []) |
|
1860 |
self.assertEqual(result, self.jdm.WAIT) |
|
1861 |
self.assertFalse(self._status) |
|
1862 |
self.assertFalse(self._queue) |
|
1863 |
self.assertTrue(self.jdm.JobWaiting(job)) |
|
1864 |
self.assertEqual(self.jdm._waiters, { |
|
1865 |
job_id: set([job]), |
|
1866 |
}) |
|
1867 |
|
|
1868 |
self._status.append((job_id, constants.JOB_STATUS_CANCELED)) |
|
1869 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, []) |
|
1870 |
self.assertEqual(result, self.jdm.CANCEL) |
|
1871 |
self.assertFalse(self._status) |
|
1872 |
self.assertFalse(self._queue) |
|
1873 |
self.assertFalse(self.jdm.JobWaiting(job)) |
|
1874 |
|
|
1875 |
def testRequireCancel(self): |
|
1876 |
job = self._FakeJob(5278) |
|
1877 |
job_id = str(9610) |
|
1878 |
dep_status = [constants.JOB_STATUS_CANCELED] |
|
1879 |
|
|
1880 |
self._status.append((job_id, constants.JOB_STATUS_WAITLOCK)) |
|
1881 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) |
|
1882 |
self.assertEqual(result, self.jdm.WAIT) |
|
1883 |
self.assertFalse(self._status) |
|
1884 |
self.assertFalse(self._queue) |
|
1885 |
self.assertTrue(self.jdm.JobWaiting(job)) |
|
1886 |
self.assertEqual(self.jdm._waiters, { |
|
1887 |
job_id: set([job]), |
|
1888 |
}) |
|
1889 |
|
|
1890 |
self._status.append((job_id, constants.JOB_STATUS_CANCELED)) |
|
1891 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) |
|
1892 |
self.assertEqual(result, self.jdm.CONTINUE) |
|
1893 |
self.assertFalse(self._status) |
|
1894 |
self.assertFalse(self._queue) |
|
1895 |
self.assertFalse(self.jdm.JobWaiting(job)) |
|
1896 |
|
|
1897 |
def testRequireError(self): |
|
1898 |
job = self._FakeJob(21459) |
|
1899 |
job_id = str(25519) |
|
1900 |
dep_status = [constants.JOB_STATUS_ERROR] |
|
1901 |
|
|
1902 |
self._status.append((job_id, constants.JOB_STATUS_WAITLOCK)) |
|
1903 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) |
|
1904 |
self.assertEqual(result, self.jdm.WAIT) |
|
1905 |
self.assertFalse(self._status) |
|
1906 |
self.assertFalse(self._queue) |
|
1907 |
self.assertTrue(self.jdm.JobWaiting(job)) |
|
1908 |
self.assertEqual(self.jdm._waiters, { |
|
1909 |
job_id: set([job]), |
|
1910 |
}) |
|
1911 |
|
|
1912 |
self._status.append((job_id, constants.JOB_STATUS_ERROR)) |
|
1913 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) |
|
1914 |
self.assertEqual(result, self.jdm.CONTINUE) |
|
1915 |
self.assertFalse(self._status) |
|
1916 |
self.assertFalse(self._queue) |
|
1917 |
self.assertFalse(self.jdm.JobWaiting(job)) |
|
1918 |
|
|
1919 |
def testRequireMultiple(self): |
|
1920 |
dep_status = list(constants.JOBS_FINALIZED) |
|
1921 |
|
|
1922 |
for end_status in dep_status: |
|
1923 |
job = self._FakeJob(21343) |
|
1924 |
job_id = str(14609) |
|
1925 |
|
|
1926 |
self._status.append((job_id, constants.JOB_STATUS_WAITLOCK)) |
|
1927 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) |
|
1928 |
self.assertEqual(result, self.jdm.WAIT) |
|
1929 |
self.assertFalse(self._status) |
|
1930 |
self.assertFalse(self._queue) |
|
1931 |
self.assertTrue(self.jdm.JobWaiting(job)) |
|
1932 |
self.assertEqual(self.jdm._waiters, { |
|
1933 |
job_id: set([job]), |
|
1934 |
}) |
|
1935 |
|
|
1936 |
self._status.append((job_id, end_status)) |
|
1937 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status) |
|
1938 |
self.assertEqual(result, self.jdm.CONTINUE) |
|
1939 |
self.assertFalse(self._status) |
|
1940 |
self.assertFalse(self._queue) |
|
1941 |
self.assertFalse(self.jdm.JobWaiting(job)) |
|
1942 |
|
|
1943 |
def testNotify(self): |
|
1944 |
job = self._FakeJob(8227) |
|
1945 |
job_id = str(4113) |
|
1946 |
|
|
1947 |
self._status.append((job_id, constants.JOB_STATUS_RUNNING)) |
|
1948 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, []) |
|
1949 |
self.assertEqual(result, self.jdm.WAIT) |
|
1950 |
self.assertFalse(self._status) |
|
1951 |
self.assertFalse(self._queue) |
|
1952 |
self.assertTrue(self.jdm.JobWaiting(job)) |
|
1953 |
self.assertEqual(self.jdm._waiters, { |
|
1954 |
job_id: set([job]), |
|
1955 |
}) |
|
1956 |
|
|
1957 |
self.jdm.NotifyWaiters(job_id) |
|
1958 |
self.assertFalse(self._status) |
|
1959 |
self.assertFalse(self.jdm._waiters) |
|
1960 |
self.assertFalse(self.jdm.JobWaiting(job)) |
|
1961 |
self.assertEqual(self._queue, [set([job])]) |
|
1962 |
|
|
1963 |
def testWrongStatus(self): |
|
1964 |
job = self._FakeJob(10102) |
|
1965 |
job_id = str(1271) |
|
1966 |
|
|
1967 |
self._status.append((job_id, constants.JOB_STATUS_QUEUED)) |
|
1968 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, |
|
1969 |
[constants.JOB_STATUS_SUCCESS]) |
|
1970 |
self.assertEqual(result, self.jdm.WAIT) |
|
1971 |
self.assertFalse(self._status) |
|
1972 |
self.assertFalse(self._queue) |
|
1973 |
self.assertTrue(self.jdm.JobWaiting(job)) |
|
1974 |
self.assertEqual(self.jdm._waiters, { |
|
1975 |
job_id: set([job]), |
|
1976 |
}) |
|
1977 |
|
|
1978 |
self._status.append((job_id, constants.JOB_STATUS_ERROR)) |
|
1979 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, |
|
1980 |
[constants.JOB_STATUS_SUCCESS]) |
|
1981 |
self.assertEqual(result, self.jdm.WRONGSTATUS) |
|
1982 |
self.assertFalse(self._status) |
|
1983 |
self.assertFalse(self._queue) |
|
1984 |
self.assertFalse(self.jdm.JobWaiting(job)) |
|
1985 |
|
|
1986 |
def testCorrectStatus(self): |
|
1987 |
job = self._FakeJob(24273) |
|
1988 |
job_id = str(23885) |
|
1989 |
|
|
1990 |
self._status.append((job_id, constants.JOB_STATUS_QUEUED)) |
|
1991 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, |
|
1992 |
[constants.JOB_STATUS_SUCCESS]) |
|
1993 |
self.assertEqual(result, self.jdm.WAIT) |
|
1994 |
self.assertFalse(self._status) |
|
1995 |
self.assertFalse(self._queue) |
|
1996 |
self.assertTrue(self.jdm.JobWaiting(job)) |
|
1997 |
self.assertEqual(self.jdm._waiters, { |
|
1998 |
job_id: set([job]), |
|
1999 |
}) |
|
2000 |
|
|
2001 |
self._status.append((job_id, constants.JOB_STATUS_SUCCESS)) |
|
2002 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, |
|
2003 |
[constants.JOB_STATUS_SUCCESS]) |
|
2004 |
self.assertEqual(result, self.jdm.CONTINUE) |
|
2005 |
self.assertFalse(self._status) |
|
2006 |
self.assertFalse(self._queue) |
|
2007 |
self.assertFalse(self.jdm.JobWaiting(job)) |
|
2008 |
|
|
2009 |
def testFinalizedRightAway(self): |
|
2010 |
job = self._FakeJob(224) |
|
2011 |
job_id = str(3081) |
|
2012 |
|
|
2013 |
self._status.append((job_id, constants.JOB_STATUS_SUCCESS)) |
|
2014 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, |
|
2015 |
[constants.JOB_STATUS_SUCCESS]) |
|
2016 |
self.assertEqual(result, self.jdm.CONTINUE) |
|
2017 |
self.assertFalse(self._status) |
|
2018 |
self.assertFalse(self._queue) |
|
2019 |
self.assertFalse(self.jdm.JobWaiting(job)) |
|
2020 |
self.assertEqual(self.jdm._waiters, { |
|
2021 |
job_id: set(), |
|
2022 |
}) |
|
2023 |
|
|
2024 |
# Force cleanup |
|
2025 |
self.jdm.NotifyWaiters("0") |
|
2026 |
self.assertFalse(self.jdm._waiters) |
|
2027 |
self.assertFalse(self._status) |
|
2028 |
self.assertFalse(self._queue) |
|
2029 |
|
|
2030 |
def testSelfDependency(self): |
|
2031 |
job = self._FakeJob(18937) |
|
2032 |
|
|
2033 |
self._status.append((job.id, constants.JOB_STATUS_SUCCESS)) |
|
2034 |
(result, _) = self.jdm.CheckAndRegister(job, job.id, []) |
|
2035 |
self.assertEqual(result, self.jdm.ERROR) |
|
2036 |
|
|
2037 |
def testJobDisappears(self): |
|
2038 |
job = self._FakeJob(30540) |
|
2039 |
job_id = str(23769) |
|
2040 |
|
|
2041 |
def _FakeStatus(_): |
|
2042 |
raise errors.JobLost("#msg#") |
|
2043 |
|
|
2044 |
jdm = jqueue._JobDependencyManager(_FakeStatus, None) |
|
2045 |
(result, _) = jdm.CheckAndRegister(job, job_id, []) |
|
2046 |
self.assertEqual(result, self.jdm.ERROR) |
|
2047 |
self.assertFalse(jdm.JobWaiting(job)) |
|
2048 |
|
|
2049 |
|
|
1415 | 2050 |
if __name__ == "__main__": |
1416 | 2051 |
testutils.GanetiTestProgram() |
Also available in: Unified diff