424 |
424 |
self.assertEqual(job.CalcStatus(), status)
|
425 |
425 |
|
426 |
426 |
|
427 |
|
class _FakeQueueForProc:
|
|
427 |
class _FakeDependencyManager:
|
428 |
428 |
def __init__(self):
|
|
429 |
self._checks = []
|
|
430 |
self._notifications = []
|
|
431 |
self._waiting = set()
|
|
432 |
|
|
433 |
def AddCheckResult(self, job, dep_job_id, dep_status, result):
|
|
434 |
self._checks.append((job, dep_job_id, dep_status, result))
|
|
435 |
|
|
436 |
def CountPendingResults(self):
|
|
437 |
return len(self._checks)
|
|
438 |
|
|
439 |
def CountWaitingJobs(self):
|
|
440 |
return len(self._waiting)
|
|
441 |
|
|
442 |
def GetNextNotification(self):
|
|
443 |
return self._notifications.pop(0)
|
|
444 |
|
|
445 |
def JobWaiting(self, job):
|
|
446 |
return job in self._waiting
|
|
447 |
|
|
448 |
def CheckAndRegister(self, job, dep_job_id, dep_status):
|
|
449 |
(exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
|
|
450 |
|
|
451 |
assert exp_job == job
|
|
452 |
assert exp_dep_job_id == dep_job_id
|
|
453 |
assert exp_dep_status == dep_status
|
|
454 |
|
|
455 |
(result_status, _) = result
|
|
456 |
|
|
457 |
if result_status == jqueue._JobDependencyManager.WAIT:
|
|
458 |
self._waiting.add(job)
|
|
459 |
elif result_status == jqueue._JobDependencyManager.CONTINUE:
|
|
460 |
self._waiting.remove(job)
|
|
461 |
|
|
462 |
return result
|
|
463 |
|
|
464 |
def NotifyWaiters(self, job_id):
|
|
465 |
self._notifications.append(job_id)
|
|
466 |
|
|
467 |
|
|
468 |
class _DisabledFakeDependencyManager:
|
|
469 |
def JobWaiting(self, _):
|
|
470 |
return False
|
|
471 |
|
|
472 |
def CheckAndRegister(self, *args):
|
|
473 |
assert False, "Should not be called"
|
|
474 |
|
|
475 |
def NotifyWaiters(self, _):
|
|
476 |
pass
|
|
477 |
|
|
478 |
|
|
479 |
class _FakeQueueForProc:
|
|
480 |
def __init__(self, depmgr=None):
|
429 |
481 |
self._acquired = False
|
430 |
482 |
self._updates = []
|
431 |
483 |
self._submitted = []
|
432 |
484 |
|
433 |
485 |
self._submit_count = itertools.count(1000)
|
434 |
486 |
|
|
487 |
if depmgr:
|
|
488 |
self.depmgr = depmgr
|
|
489 |
else:
|
|
490 |
self.depmgr = _DisabledFakeDependencyManager()
|
|
491 |
|
435 |
492 |
def IsAcquired(self):
|
436 |
493 |
return self._acquired
|
437 |
494 |
|
... | ... | |
960 |
1017 |
|
961 |
1018 |
# ... also after being restored
|
962 |
1019 |
job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
|
|
1020 |
# Calling the processor on a finished job should be a no-op
|
963 |
1021 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
|
964 |
1022 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
965 |
1023 |
|
... | ... | |
1179 |
1237 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
|
1180 |
1238 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
1181 |
1239 |
|
|
1240 |
def testJobDependency(self):
|
|
1241 |
depmgr = _FakeDependencyManager()
|
|
1242 |
queue = _FakeQueueForProc(depmgr=depmgr)
|
|
1243 |
|
|
1244 |
self.assertEqual(queue.depmgr, depmgr)
|
|
1245 |
|
|
1246 |
prev_job_id = 22113
|
|
1247 |
prev_job_id2 = 28102
|
|
1248 |
job_id = 29929
|
|
1249 |
ops = [
|
|
1250 |
opcodes.OpTestDummy(result="Res0", fail=False,
|
|
1251 |
depends=[
|
|
1252 |
[prev_job_id2, None],
|
|
1253 |
[prev_job_id, None],
|
|
1254 |
]),
|
|
1255 |
opcodes.OpTestDummy(result="Res1", fail=False),
|
|
1256 |
]
|
|
1257 |
|
|
1258 |
# Create job
|
|
1259 |
job = self._CreateJob(queue, job_id, ops)
|
|
1260 |
|
|
1261 |
def _BeforeStart(timeout, priority):
|
|
1262 |
if attempt == 0 or attempt > 5:
|
|
1263 |
# Job should only be updated when it wasn't waiting for another job
|
|
1264 |
self.assertEqual(queue.GetNextUpdate(), (job, True))
|
|
1265 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1266 |
self.assertFalse(queue.IsAcquired())
|
|
1267 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
|
|
1268 |
self.assertFalse(job.cur_opctx)
|
|
1269 |
|
|
1270 |
def _AfterStart(op, cbs):
|
|
1271 |
self.assertEqual(queue.GetNextUpdate(), (job, True))
|
|
1272 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1273 |
|
|
1274 |
self.assertFalse(queue.IsAcquired())
|
|
1275 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
|
|
1276 |
self.assertFalse(job.cur_opctx)
|
|
1277 |
|
|
1278 |
# Job is running, cancelling shouldn't be possible
|
|
1279 |
(success, _) = job.Cancel()
|
|
1280 |
self.assertFalse(success)
|
|
1281 |
|
|
1282 |
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
|
|
1283 |
|
|
1284 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
1285 |
|
|
1286 |
counter = itertools.count()
|
|
1287 |
while True:
|
|
1288 |
attempt = counter.next()
|
|
1289 |
|
|
1290 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1291 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1292 |
|
|
1293 |
if attempt < 2:
|
|
1294 |
depmgr.AddCheckResult(job, prev_job_id2, None,
|
|
1295 |
(jqueue._JobDependencyManager.WAIT, "wait2"))
|
|
1296 |
elif attempt == 2:
|
|
1297 |
depmgr.AddCheckResult(job, prev_job_id2, None,
|
|
1298 |
(jqueue._JobDependencyManager.CONTINUE, "cont"))
|
|
1299 |
# The processor will ask for the next dependency immediately
|
|
1300 |
depmgr.AddCheckResult(job, prev_job_id, None,
|
|
1301 |
(jqueue._JobDependencyManager.WAIT, "wait"))
|
|
1302 |
elif attempt < 5:
|
|
1303 |
depmgr.AddCheckResult(job, prev_job_id, None,
|
|
1304 |
(jqueue._JobDependencyManager.WAIT, "wait"))
|
|
1305 |
elif attempt == 5:
|
|
1306 |
depmgr.AddCheckResult(job, prev_job_id, None,
|
|
1307 |
(jqueue._JobDependencyManager.CONTINUE, "cont"))
|
|
1308 |
if attempt == 2:
|
|
1309 |
self.assertEqual(depmgr.CountPendingResults(), 2)
|
|
1310 |
elif attempt > 5:
|
|
1311 |
self.assertEqual(depmgr.CountPendingResults(), 0)
|
|
1312 |
else:
|
|
1313 |
self.assertEqual(depmgr.CountPendingResults(), 1)
|
|
1314 |
|
|
1315 |
result = jqueue._JobProcessor(queue, opexec, job)()
|
|
1316 |
if attempt == 0 or attempt >= 5:
|
|
1317 |
# Job should only be updated if there was an actual change
|
|
1318 |
self.assertEqual(queue.GetNextUpdate(), (job, True))
|
|
1319 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1320 |
self.assertFalse(depmgr.CountPendingResults())
|
|
1321 |
|
|
1322 |
if attempt < 5:
|
|
1323 |
# Simulate waiting for other job
|
|
1324 |
self.assertTrue(result)
|
|
1325 |
self.assertTrue(job.cur_opctx)
|
|
1326 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
|
|
1327 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1328 |
self.assert_(job.start_timestamp)
|
|
1329 |
self.assertFalse(job.end_timestamp)
|
|
1330 |
continue
|
|
1331 |
|
|
1332 |
if result:
|
|
1333 |
# Last opcode
|
|
1334 |
self.assertFalse(job.cur_opctx)
|
|
1335 |
self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
|
|
1336 |
break
|
|
1337 |
|
|
1338 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1339 |
|
|
1340 |
self.assertFalse(result)
|
|
1341 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
1342 |
self.assert_(job.start_timestamp)
|
|
1343 |
self.assertFalse(job.end_timestamp)
|
|
1344 |
|
|
1345 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
|
|
1346 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
|
|
1347 |
self.assertEqual(job.GetInfo(["opresult"]),
|
|
1348 |
[[op.input.result for op in job.ops]])
|
|
1349 |
self.assertEqual(job.GetInfo(["opstatus"]),
|
|
1350 |
[len(job.ops) * [constants.OP_STATUS_SUCCESS]])
|
|
1351 |
self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
|
|
1352 |
for op in job.ops))
|
|
1353 |
|
|
1354 |
self._GenericCheckJob(job)
|
|
1355 |
|
|
1356 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1357 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1358 |
self.assertFalse(depmgr.CountPendingResults())
|
|
1359 |
self.assertFalse(depmgr.CountWaitingJobs())
|
|
1360 |
|
|
1361 |
# Calling the processor on a finished job should be a no-op
|
|
1362 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
|
|
1363 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1364 |
|
|
1365 |
def testJobDependencyCancel(self):
|
|
1366 |
depmgr = _FakeDependencyManager()
|
|
1367 |
queue = _FakeQueueForProc(depmgr=depmgr)
|
|
1368 |
|
|
1369 |
self.assertEqual(queue.depmgr, depmgr)
|
|
1370 |
|
|
1371 |
prev_job_id = 13623
|
|
1372 |
job_id = 30876
|
|
1373 |
ops = [
|
|
1374 |
opcodes.OpTestDummy(result="Res0", fail=False),
|
|
1375 |
opcodes.OpTestDummy(result="Res1", fail=False,
|
|
1376 |
depends=[
|
|
1377 |
[prev_job_id, None],
|
|
1378 |
]),
|
|
1379 |
opcodes.OpTestDummy(result="Res2", fail=False),
|
|
1380 |
]
|
|
1381 |
|
|
1382 |
# Create job
|
|
1383 |
job = self._CreateJob(queue, job_id, ops)
|
|
1384 |
|
|
1385 |
def _BeforeStart(timeout, priority):
|
|
1386 |
if attempt == 0 or attempt > 5:
|
|
1387 |
# Job should only be updated when it wasn't waiting for another job
|
|
1388 |
self.assertEqual(queue.GetNextUpdate(), (job, True))
|
|
1389 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1390 |
self.assertFalse(queue.IsAcquired())
|
|
1391 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
|
|
1392 |
self.assertFalse(job.cur_opctx)
|
|
1393 |
|
|
1394 |
def _AfterStart(op, cbs):
|
|
1395 |
self.assertEqual(queue.GetNextUpdate(), (job, True))
|
|
1396 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1397 |
|
|
1398 |
self.assertFalse(queue.IsAcquired())
|
|
1399 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
|
|
1400 |
self.assertFalse(job.cur_opctx)
|
|
1401 |
|
|
1402 |
# Job is running, cancelling shouldn't be possible
|
|
1403 |
(success, _) = job.Cancel()
|
|
1404 |
self.assertFalse(success)
|
|
1405 |
|
|
1406 |
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
|
|
1407 |
|
|
1408 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
1409 |
|
|
1410 |
counter = itertools.count()
|
|
1411 |
while True:
|
|
1412 |
attempt = counter.next()
|
|
1413 |
|
|
1414 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1415 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1416 |
|
|
1417 |
if attempt == 0:
|
|
1418 |
# This will handle the first opcode
|
|
1419 |
pass
|
|
1420 |
elif attempt < 4:
|
|
1421 |
depmgr.AddCheckResult(job, prev_job_id, None,
|
|
1422 |
(jqueue._JobDependencyManager.WAIT, "wait"))
|
|
1423 |
elif attempt == 4:
|
|
1424 |
# Other job was cancelled
|
|
1425 |
depmgr.AddCheckResult(job, prev_job_id, None,
|
|
1426 |
(jqueue._JobDependencyManager.CANCEL, "cancel"))
|
|
1427 |
|
|
1428 |
if attempt == 0:
|
|
1429 |
self.assertEqual(depmgr.CountPendingResults(), 0)
|
|
1430 |
else:
|
|
1431 |
self.assertEqual(depmgr.CountPendingResults(), 1)
|
|
1432 |
|
|
1433 |
result = jqueue._JobProcessor(queue, opexec, job)()
|
|
1434 |
if attempt <= 1 or attempt >= 4:
|
|
1435 |
# Job should only be updated if there was an actual change
|
|
1436 |
self.assertEqual(queue.GetNextUpdate(), (job, True))
|
|
1437 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1438 |
self.assertFalse(depmgr.CountPendingResults())
|
|
1439 |
|
|
1440 |
if attempt > 0 and attempt < 4:
|
|
1441 |
# Simulate waiting for other job
|
|
1442 |
self.assertTrue(result)
|
|
1443 |
self.assertTrue(job.cur_opctx)
|
|
1444 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
|
|
1445 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1446 |
self.assert_(job.start_timestamp)
|
|
1447 |
self.assertFalse(job.end_timestamp)
|
|
1448 |
continue
|
|
1449 |
|
|
1450 |
if result:
|
|
1451 |
# Last opcode
|
|
1452 |
self.assertFalse(job.cur_opctx)
|
|
1453 |
self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
|
|
1454 |
break
|
|
1455 |
|
|
1456 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1457 |
|
|
1458 |
self.assertFalse(result)
|
|
1459 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
1460 |
self.assert_(job.start_timestamp)
|
|
1461 |
self.assertFalse(job.end_timestamp)
|
|
1462 |
|
|
1463 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
|
|
1464 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
|
|
1465 |
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
|
|
1466 |
[[constants.OP_STATUS_SUCCESS,
|
|
1467 |
constants.OP_STATUS_CANCELED,
|
|
1468 |
constants.OP_STATUS_CANCELED],
|
|
1469 |
["Res0", "Job canceled by request",
|
|
1470 |
"Job canceled by request"]])
|
|
1471 |
|
|
1472 |
self._GenericCheckJob(job)
|
|
1473 |
|
|
1474 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1475 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1476 |
self.assertFalse(depmgr.CountPendingResults())
|
|
1477 |
|
|
1478 |
# Calling the processor on a finished job should be a no-op
|
|
1479 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
|
|
1480 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1481 |
|
|
1482 |
def testJobDependencyWrongstatus(self):
|
|
1483 |
depmgr = _FakeDependencyManager()
|
|
1484 |
queue = _FakeQueueForProc(depmgr=depmgr)
|
|
1485 |
|
|
1486 |
self.assertEqual(queue.depmgr, depmgr)
|
|
1487 |
|
|
1488 |
prev_job_id = 9741
|
|
1489 |
job_id = 11763
|
|
1490 |
ops = [
|
|
1491 |
opcodes.OpTestDummy(result="Res0", fail=False),
|
|
1492 |
opcodes.OpTestDummy(result="Res1", fail=False,
|
|
1493 |
depends=[
|
|
1494 |
[prev_job_id, None],
|
|
1495 |
]),
|
|
1496 |
opcodes.OpTestDummy(result="Res2", fail=False),
|
|
1497 |
]
|
|
1498 |
|
|
1499 |
# Create job
|
|
1500 |
job = self._CreateJob(queue, job_id, ops)
|
|
1501 |
|
|
1502 |
def _BeforeStart(timeout, priority):
|
|
1503 |
if attempt == 0 or attempt > 5:
|
|
1504 |
# Job should only be updated when it wasn't waiting for another job
|
|
1505 |
self.assertEqual(queue.GetNextUpdate(), (job, True))
|
|
1506 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1507 |
self.assertFalse(queue.IsAcquired())
|
|
1508 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
|
|
1509 |
self.assertFalse(job.cur_opctx)
|
|
1510 |
|
|
1511 |
def _AfterStart(op, cbs):
|
|
1512 |
self.assertEqual(queue.GetNextUpdate(), (job, True))
|
|
1513 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1514 |
|
|
1515 |
self.assertFalse(queue.IsAcquired())
|
|
1516 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
|
|
1517 |
self.assertFalse(job.cur_opctx)
|
|
1518 |
|
|
1519 |
# Job is running, cancelling shouldn't be possible
|
|
1520 |
(success, _) = job.Cancel()
|
|
1521 |
self.assertFalse(success)
|
|
1522 |
|
|
1523 |
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
|
|
1524 |
|
|
1525 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
1526 |
|
|
1527 |
counter = itertools.count()
|
|
1528 |
while True:
|
|
1529 |
attempt = counter.next()
|
|
1530 |
|
|
1531 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1532 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1533 |
|
|
1534 |
if attempt == 0:
|
|
1535 |
# This will handle the first opcode
|
|
1536 |
pass
|
|
1537 |
elif attempt < 4:
|
|
1538 |
depmgr.AddCheckResult(job, prev_job_id, None,
|
|
1539 |
(jqueue._JobDependencyManager.WAIT, "wait"))
|
|
1540 |
elif attempt == 4:
|
|
1541 |
# Other job failed
|
|
1542 |
depmgr.AddCheckResult(job, prev_job_id, None,
|
|
1543 |
(jqueue._JobDependencyManager.WRONGSTATUS, "w"))
|
|
1544 |
|
|
1545 |
if attempt == 0:
|
|
1546 |
self.assertEqual(depmgr.CountPendingResults(), 0)
|
|
1547 |
else:
|
|
1548 |
self.assertEqual(depmgr.CountPendingResults(), 1)
|
|
1549 |
|
|
1550 |
result = jqueue._JobProcessor(queue, opexec, job)()
|
|
1551 |
if attempt <= 1 or attempt >= 4:
|
|
1552 |
# Job should only be updated if there was an actual change
|
|
1553 |
self.assertEqual(queue.GetNextUpdate(), (job, True))
|
|
1554 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1555 |
self.assertFalse(depmgr.CountPendingResults())
|
|
1556 |
|
|
1557 |
if attempt > 0 and attempt < 4:
|
|
1558 |
# Simulate waiting for other job
|
|
1559 |
self.assertTrue(result)
|
|
1560 |
self.assertTrue(job.cur_opctx)
|
|
1561 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
|
|
1562 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1563 |
self.assert_(job.start_timestamp)
|
|
1564 |
self.assertFalse(job.end_timestamp)
|
|
1565 |
continue
|
|
1566 |
|
|
1567 |
if result:
|
|
1568 |
# Last opcode
|
|
1569 |
self.assertFalse(job.cur_opctx)
|
|
1570 |
self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
|
|
1571 |
break
|
|
1572 |
|
|
1573 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1574 |
|
|
1575 |
self.assertFalse(result)
|
|
1576 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
1577 |
self.assert_(job.start_timestamp)
|
|
1578 |
self.assertFalse(job.end_timestamp)
|
|
1579 |
|
|
1580 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
|
|
1581 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
|
|
1582 |
self.assertEqual(job.GetInfo(["opstatus"]),
|
|
1583 |
[[constants.OP_STATUS_SUCCESS,
|
|
1584 |
constants.OP_STATUS_ERROR,
|
|
1585 |
constants.OP_STATUS_ERROR]]),
|
|
1586 |
|
|
1587 |
(opresult, ) = job.GetInfo(["opresult"])
|
|
1588 |
self.assertEqual(len(opresult), len(ops))
|
|
1589 |
self.assertEqual(opresult[0], "Res0")
|
|
1590 |
self.assertTrue(errors.GetEncodedError(opresult[1]))
|
|
1591 |
self.assertTrue(errors.GetEncodedError(opresult[2]))
|
|
1592 |
|
|
1593 |
self._GenericCheckJob(job)
|
|
1594 |
|
|
1595 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1596 |
self.assertRaises(IndexError, depmgr.GetNextNotification)
|
|
1597 |
self.assertFalse(depmgr.CountPendingResults())
|
|
1598 |
|
|
1599 |
# Calling the processor on a finished job should be a no-op
|
|
1600 |
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
|
|
1601 |
self.assertRaises(IndexError, queue.GetNextUpdate)
|
|
1602 |
|
1182 |
1603 |
|
1183 |
1604 |
class _FakeTimeoutStrategy:
|
1184 |
1605 |
def __init__(self, timeouts):
|
... | ... | |
1412 |
1833 |
self.assertRaises(IndexError, self.queue.GetNextUpdate)
|
1413 |
1834 |
|
1414 |
1835 |
|
|
1836 |
class TestJobDependencyManager(unittest.TestCase):
|
|
1837 |
class _FakeJob:
|
|
1838 |
def __init__(self, job_id):
|
|
1839 |
self.id = str(job_id)
|
|
1840 |
|
|
1841 |
def setUp(self):
|
|
1842 |
self._status = []
|
|
1843 |
self._queue = []
|
|
1844 |
self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
|
|
1845 |
|
|
1846 |
def _GetStatus(self, job_id):
|
|
1847 |
(exp_job_id, result) = self._status.pop(0)
|
|
1848 |
self.assertEqual(exp_job_id, job_id)
|
|
1849 |
return result
|
|
1850 |
|
|
1851 |
def _Enqueue(self, jobs):
|
|
1852 |
self._queue.append(jobs)
|
|
1853 |
|
|
1854 |
def testNotFinalizedThenCancel(self):
|
|
1855 |
job = self._FakeJob(17697)
|
|
1856 |
job_id = str(28625)
|
|
1857 |
|
|
1858 |
self._status.append((job_id, constants.JOB_STATUS_RUNNING))
|
|
1859 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, [])
|
|
1860 |
self.assertEqual(result, self.jdm.WAIT)
|
|
1861 |
self.assertFalse(self._status)
|
|
1862 |
self.assertFalse(self._queue)
|
|
1863 |
self.assertTrue(self.jdm.JobWaiting(job))
|
|
1864 |
self.assertEqual(self.jdm._waiters, {
|
|
1865 |
job_id: set([job]),
|
|
1866 |
})
|
|
1867 |
|
|
1868 |
self._status.append((job_id, constants.JOB_STATUS_CANCELED))
|
|
1869 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, [])
|
|
1870 |
self.assertEqual(result, self.jdm.CANCEL)
|
|
1871 |
self.assertFalse(self._status)
|
|
1872 |
self.assertFalse(self._queue)
|
|
1873 |
self.assertFalse(self.jdm.JobWaiting(job))
|
|
1874 |
|
|
1875 |
def testRequireCancel(self):
|
|
1876 |
job = self._FakeJob(5278)
|
|
1877 |
job_id = str(9610)
|
|
1878 |
dep_status = [constants.JOB_STATUS_CANCELED]
|
|
1879 |
|
|
1880 |
self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
|
|
1881 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
|
|
1882 |
self.assertEqual(result, self.jdm.WAIT)
|
|
1883 |
self.assertFalse(self._status)
|
|
1884 |
self.assertFalse(self._queue)
|
|
1885 |
self.assertTrue(self.jdm.JobWaiting(job))
|
|
1886 |
self.assertEqual(self.jdm._waiters, {
|
|
1887 |
job_id: set([job]),
|
|
1888 |
})
|
|
1889 |
|
|
1890 |
self._status.append((job_id, constants.JOB_STATUS_CANCELED))
|
|
1891 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
|
|
1892 |
self.assertEqual(result, self.jdm.CONTINUE)
|
|
1893 |
self.assertFalse(self._status)
|
|
1894 |
self.assertFalse(self._queue)
|
|
1895 |
self.assertFalse(self.jdm.JobWaiting(job))
|
|
1896 |
|
|
1897 |
def testRequireError(self):
|
|
1898 |
job = self._FakeJob(21459)
|
|
1899 |
job_id = str(25519)
|
|
1900 |
dep_status = [constants.JOB_STATUS_ERROR]
|
|
1901 |
|
|
1902 |
self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
|
|
1903 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
|
|
1904 |
self.assertEqual(result, self.jdm.WAIT)
|
|
1905 |
self.assertFalse(self._status)
|
|
1906 |
self.assertFalse(self._queue)
|
|
1907 |
self.assertTrue(self.jdm.JobWaiting(job))
|
|
1908 |
self.assertEqual(self.jdm._waiters, {
|
|
1909 |
job_id: set([job]),
|
|
1910 |
})
|
|
1911 |
|
|
1912 |
self._status.append((job_id, constants.JOB_STATUS_ERROR))
|
|
1913 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
|
|
1914 |
self.assertEqual(result, self.jdm.CONTINUE)
|
|
1915 |
self.assertFalse(self._status)
|
|
1916 |
self.assertFalse(self._queue)
|
|
1917 |
self.assertFalse(self.jdm.JobWaiting(job))
|
|
1918 |
|
|
1919 |
def testRequireMultiple(self):
|
|
1920 |
dep_status = list(constants.JOBS_FINALIZED)
|
|
1921 |
|
|
1922 |
for end_status in dep_status:
|
|
1923 |
job = self._FakeJob(21343)
|
|
1924 |
job_id = str(14609)
|
|
1925 |
|
|
1926 |
self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
|
|
1927 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
|
|
1928 |
self.assertEqual(result, self.jdm.WAIT)
|
|
1929 |
self.assertFalse(self._status)
|
|
1930 |
self.assertFalse(self._queue)
|
|
1931 |
self.assertTrue(self.jdm.JobWaiting(job))
|
|
1932 |
self.assertEqual(self.jdm._waiters, {
|
|
1933 |
job_id: set([job]),
|
|
1934 |
})
|
|
1935 |
|
|
1936 |
self._status.append((job_id, end_status))
|
|
1937 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
|
|
1938 |
self.assertEqual(result, self.jdm.CONTINUE)
|
|
1939 |
self.assertFalse(self._status)
|
|
1940 |
self.assertFalse(self._queue)
|
|
1941 |
self.assertFalse(self.jdm.JobWaiting(job))
|
|
1942 |
|
|
1943 |
def testNotify(self):
|
|
1944 |
job = self._FakeJob(8227)
|
|
1945 |
job_id = str(4113)
|
|
1946 |
|
|
1947 |
self._status.append((job_id, constants.JOB_STATUS_RUNNING))
|
|
1948 |
(result, _) = self.jdm.CheckAndRegister(job, job_id, [])
|
|
1949 |
self.assertEqual(result, self.jdm.WAIT)
|
|
1950 |
self.assertFalse(self._status)
|
|
1951 |
self.assertFalse(self._queue)
|
|
1952 |
self.assertTrue(self.jdm.JobWaiting(job))
|
|
1953 |
self.assertEqual(self.jdm._waiters, {
|
|
1954 |
job_id: set([job]),
|
|
1955 |
})
|
|
1956 |
|
|
1957 |
self.jdm.NotifyWaiters(job_id)
|
|
1958 |
self.assertFalse(self._status)
|
|
1959 |
self.assertFalse(self.jdm._waiters)
|
|
1960 |
self.assertFalse(self.jdm.JobWaiting(job))
|
|
1961 |
self.assertEqual(self._queue, [set([job])])
|
|
1962 |
|
|
1963 |
def testWrongStatus(self):
|
|
1964 |
job = self._FakeJob(10102)
|
|
1965 |
job_id = str(1271)
|
|
1966 |
|
|
1967 |
self._status.append((job_id, constants.JOB_STATUS_QUEUED))
|
|
1968 |
(result, _) = self.jdm.CheckAndRegister(job, job_id,
|
|
1969 |
[constants.JOB_STATUS_SUCCESS])
|
|
1970 |
self.assertEqual(result, self.jdm.WAIT)
|
|
1971 |
self.assertFalse(self._status)
|
|
1972 |
self.assertFalse(self._queue)
|
|
1973 |
self.assertTrue(self.jdm.JobWaiting(job))
|
|
1974 |
self.assertEqual(self.jdm._waiters, {
|
|
1975 |
job_id: set([job]),
|
|
1976 |
})
|
|
1977 |
|
|
1978 |
self._status.append((job_id, constants.JOB_STATUS_ERROR))
|
|
1979 |
(result, _) = self.jdm.CheckAndRegister(job, job_id,
|
|
1980 |
[constants.JOB_STATUS_SUCCESS])
|
|
1981 |
self.assertEqual(result, self.jdm.WRONGSTATUS)
|
|
1982 |
self.assertFalse(self._status)
|
|
1983 |
self.assertFalse(self._queue)
|
|
1984 |
self.assertFalse(self.jdm.JobWaiting(job))
|
|
1985 |
|
|
1986 |
def testCorrectStatus(self):
|
|
1987 |
job = self._FakeJob(24273)
|
|
1988 |
job_id = str(23885)
|
|
1989 |
|
|
1990 |
self._status.append((job_id, constants.JOB_STATUS_QUEUED))
|
|
1991 |
(result, _) = self.jdm.CheckAndRegister(job, job_id,
|
|
1992 |
[constants.JOB_STATUS_SUCCESS])
|
|
1993 |
self.assertEqual(result, self.jdm.WAIT)
|
|
1994 |
self.assertFalse(self._status)
|
|
1995 |
self.assertFalse(self._queue)
|
|
1996 |
self.assertTrue(self.jdm.JobWaiting(job))
|
|
1997 |
self.assertEqual(self.jdm._waiters, {
|
|
1998 |
job_id: set([job]),
|
|
1999 |
})
|
|
2000 |
|
|
2001 |
self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
|
|
2002 |
(result, _) = self.jdm.CheckAndRegister(job, job_id,
|
|
2003 |
[constants.JOB_STATUS_SUCCESS])
|
|
2004 |
self.assertEqual(result, self.jdm.CONTINUE)
|
|
2005 |
self.assertFalse(self._status)
|
|
2006 |
self.assertFalse(self._queue)
|
|
2007 |
self.assertFalse(self.jdm.JobWaiting(job))
|
|
2008 |
|
|
2009 |
def testFinalizedRightAway(self):
|
|
2010 |
job = self._FakeJob(224)
|
|
2011 |
job_id = str(3081)
|
|
2012 |
|
|
2013 |
self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
|
|
2014 |
(result, _) = self.jdm.CheckAndRegister(job, job_id,
|
|
2015 |
[constants.JOB_STATUS_SUCCESS])
|
|
2016 |
self.assertEqual(result, self.jdm.CONTINUE)
|
|
2017 |
self.assertFalse(self._status)
|
|
2018 |
self.assertFalse(self._queue)
|
|
2019 |
self.assertFalse(self.jdm.JobWaiting(job))
|
|
2020 |
self.assertEqual(self.jdm._waiters, {
|
|
2021 |
job_id: set(),
|
|
2022 |
})
|
|
2023 |
|
|
2024 |
# Force cleanup
|
|
2025 |
self.jdm.NotifyWaiters("0")
|
|
2026 |
self.assertFalse(self.jdm._waiters)
|
|
2027 |
self.assertFalse(self._status)
|
|
2028 |
self.assertFalse(self._queue)
|
|
2029 |
|
|
2030 |
def testSelfDependency(self):
|
|
2031 |
job = self._FakeJob(18937)
|
|
2032 |
|
|
2033 |
self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
|
|
2034 |
(result, _) = self.jdm.CheckAndRegister(job, job.id, [])
|
|
2035 |
self.assertEqual(result, self.jdm.ERROR)
|
|
2036 |
|
|
2037 |
def testJobDisappears(self):
|
|
2038 |
job = self._FakeJob(30540)
|
|
2039 |
job_id = str(23769)
|
|
2040 |
|
|
2041 |
def _FakeStatus(_):
|
|
2042 |
raise errors.JobLost("#msg#")
|
|
2043 |
|
|
2044 |
jdm = jqueue._JobDependencyManager(_FakeStatus, None)
|
|
2045 |
(result, _) = jdm.CheckAndRegister(job, job_id, [])
|
|
2046 |
self.assertEqual(result, self.jdm.ERROR)
|
|
2047 |
self.assertFalse(jdm.JobWaiting(job))
|
|
2048 |
|
|
2049 |
|
1415 |
2050 |
if __name__ == "__main__":
|
1416 |
2051 |
testutils.GanetiTestProgram()
|