Revision b95479a5 test/ganeti.jqueue_unittest.py

b/test/ganeti.jqueue_unittest.py
424 424
        self.assertEqual(job.CalcStatus(), status)
425 425

  
426 426

  
427
class _FakeQueueForProc:
427
class _FakeDependencyManager:
428 428
  def __init__(self):
429
    self._checks = []
430
    self._notifications = []
431
    self._waiting = set()
432

  
433
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
434
    self._checks.append((job, dep_job_id, dep_status, result))
435

  
436
  def CountPendingResults(self):
437
    return len(self._checks)
438

  
439
  def CountWaitingJobs(self):
440
    return len(self._waiting)
441

  
442
  def GetNextNotification(self):
443
    return self._notifications.pop(0)
444

  
445
  def JobWaiting(self, job):
446
    return job in self._waiting
447

  
448
  def CheckAndRegister(self, job, dep_job_id, dep_status):
449
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
450

  
451
    assert exp_job == job
452
    assert exp_dep_job_id == dep_job_id
453
    assert exp_dep_status == dep_status
454

  
455
    (result_status, _) = result
456

  
457
    if result_status == jqueue._JobDependencyManager.WAIT:
458
      self._waiting.add(job)
459
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
460
      self._waiting.remove(job)
461

  
462
    return result
463

  
464
  def NotifyWaiters(self, job_id):
465
    self._notifications.append(job_id)
466

  
467

  
468
class _DisabledFakeDependencyManager:
469
  def JobWaiting(self, _):
470
    return False
471

  
472
  def CheckAndRegister(self, *args):
473
    assert False, "Should not be called"
474

  
475
  def NotifyWaiters(self, _):
476
    pass
477

  
478

  
479
class _FakeQueueForProc:
480
  def __init__(self, depmgr=None):
429 481
    self._acquired = False
430 482
    self._updates = []
431 483
    self._submitted = []
432 484

  
433 485
    self._submit_count = itertools.count(1000)
434 486

  
487
    if depmgr:
488
      self.depmgr = depmgr
489
    else:
490
      self.depmgr = _DisabledFakeDependencyManager()
491

  
435 492
  def IsAcquired(self):
436 493
    return self._acquired
437 494

  
......
960 1017

  
961 1018
    # ... also after being restored
962 1019
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
1020
    # Calling the processor on a finished job should be a no-op
963 1021
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
964 1022
    self.assertRaises(IndexError, queue.GetNextUpdate)
965 1023

  
......
1179 1237
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1180 1238
    self.assertRaises(IndexError, queue.GetNextUpdate)
1181 1239

  
1240
  def testJobDependency(self):
1241
    depmgr = _FakeDependencyManager()
1242
    queue = _FakeQueueForProc(depmgr=depmgr)
1243

  
1244
    self.assertEqual(queue.depmgr, depmgr)
1245

  
1246
    prev_job_id = 22113
1247
    prev_job_id2 = 28102
1248
    job_id = 29929
1249
    ops = [
1250
      opcodes.OpTestDummy(result="Res0", fail=False,
1251
                          depends=[
1252
                            [prev_job_id2, None],
1253
                            [prev_job_id, None],
1254
                            ]),
1255
      opcodes.OpTestDummy(result="Res1", fail=False),
1256
      ]
1257

  
1258
    # Create job
1259
    job = self._CreateJob(queue, job_id, ops)
1260

  
1261
    def _BeforeStart(timeout, priority):
1262
      if attempt == 0 or attempt > 5:
1263
        # Job should only be updated when it wasn't waiting for another job
1264
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1265
      self.assertRaises(IndexError, queue.GetNextUpdate)
1266
      self.assertFalse(queue.IsAcquired())
1267
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1268
      self.assertFalse(job.cur_opctx)
1269

  
1270
    def _AfterStart(op, cbs):
1271
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1272
      self.assertRaises(IndexError, queue.GetNextUpdate)
1273

  
1274
      self.assertFalse(queue.IsAcquired())
1275
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1276
      self.assertFalse(job.cur_opctx)
1277

  
1278
      # Job is running, cancelling shouldn't be possible
1279
      (success, _) = job.Cancel()
1280
      self.assertFalse(success)
1281

  
1282
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1283

  
1284
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1285

  
1286
    counter = itertools.count()
1287
    while True:
1288
      attempt = counter.next()
1289

  
1290
      self.assertRaises(IndexError, queue.GetNextUpdate)
1291
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1292

  
1293
      if attempt < 2:
1294
        depmgr.AddCheckResult(job, prev_job_id2, None,
1295
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1296
      elif attempt == 2:
1297
        depmgr.AddCheckResult(job, prev_job_id2, None,
1298
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1299
        # The processor will ask for the next dependency immediately
1300
        depmgr.AddCheckResult(job, prev_job_id, None,
1301
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1302
      elif attempt < 5:
1303
        depmgr.AddCheckResult(job, prev_job_id, None,
1304
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1305
      elif attempt == 5:
1306
        depmgr.AddCheckResult(job, prev_job_id, None,
1307
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1308
      if attempt == 2:
1309
        self.assertEqual(depmgr.CountPendingResults(), 2)
1310
      elif attempt > 5:
1311
        self.assertEqual(depmgr.CountPendingResults(), 0)
1312
      else:
1313
        self.assertEqual(depmgr.CountPendingResults(), 1)
1314

  
1315
      result = jqueue._JobProcessor(queue, opexec, job)()
1316
      if attempt == 0 or attempt >= 5:
1317
        # Job should only be updated if there was an actual change
1318
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1319
      self.assertRaises(IndexError, queue.GetNextUpdate)
1320
      self.assertFalse(depmgr.CountPendingResults())
1321

  
1322
      if attempt < 5:
1323
        # Simulate waiting for other job
1324
        self.assertTrue(result)
1325
        self.assertTrue(job.cur_opctx)
1326
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1327
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1328
        self.assert_(job.start_timestamp)
1329
        self.assertFalse(job.end_timestamp)
1330
        continue
1331

  
1332
      if result:
1333
        # Last opcode
1334
        self.assertFalse(job.cur_opctx)
1335
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1336
        break
1337

  
1338
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1339

  
1340
      self.assertFalse(result)
1341
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1342
      self.assert_(job.start_timestamp)
1343
      self.assertFalse(job.end_timestamp)
1344

  
1345
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1346
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1347
    self.assertEqual(job.GetInfo(["opresult"]),
1348
                     [[op.input.result for op in job.ops]])
1349
    self.assertEqual(job.GetInfo(["opstatus"]),
1350
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1351
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1352
                               for op in job.ops))
1353

  
1354
    self._GenericCheckJob(job)
1355

  
1356
    self.assertRaises(IndexError, queue.GetNextUpdate)
1357
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1358
    self.assertFalse(depmgr.CountPendingResults())
1359
    self.assertFalse(depmgr.CountWaitingJobs())
1360

  
1361
    # Calling the processor on a finished job should be a no-op
1362
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1363
    self.assertRaises(IndexError, queue.GetNextUpdate)
1364

  
1365
  def testJobDependencyCancel(self):
1366
    depmgr = _FakeDependencyManager()
1367
    queue = _FakeQueueForProc(depmgr=depmgr)
1368

  
1369
    self.assertEqual(queue.depmgr, depmgr)
1370

  
1371
    prev_job_id = 13623
1372
    job_id = 30876
1373
    ops = [
1374
      opcodes.OpTestDummy(result="Res0", fail=False),
1375
      opcodes.OpTestDummy(result="Res1", fail=False,
1376
                          depends=[
1377
                            [prev_job_id, None],
1378
                            ]),
1379
      opcodes.OpTestDummy(result="Res2", fail=False),
1380
      ]
1381

  
1382
    # Create job
1383
    job = self._CreateJob(queue, job_id, ops)
1384

  
1385
    def _BeforeStart(timeout, priority):
1386
      if attempt == 0 or attempt > 5:
1387
        # Job should only be updated when it wasn't waiting for another job
1388
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1389
      self.assertRaises(IndexError, queue.GetNextUpdate)
1390
      self.assertFalse(queue.IsAcquired())
1391
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1392
      self.assertFalse(job.cur_opctx)
1393

  
1394
    def _AfterStart(op, cbs):
1395
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1396
      self.assertRaises(IndexError, queue.GetNextUpdate)
1397

  
1398
      self.assertFalse(queue.IsAcquired())
1399
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1400
      self.assertFalse(job.cur_opctx)
1401

  
1402
      # Job is running, cancelling shouldn't be possible
1403
      (success, _) = job.Cancel()
1404
      self.assertFalse(success)
1405

  
1406
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1407

  
1408
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1409

  
1410
    counter = itertools.count()
1411
    while True:
1412
      attempt = counter.next()
1413

  
1414
      self.assertRaises(IndexError, queue.GetNextUpdate)
1415
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1416

  
1417
      if attempt == 0:
1418
        # This will handle the first opcode
1419
        pass
1420
      elif attempt < 4:
1421
        depmgr.AddCheckResult(job, prev_job_id, None,
1422
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1423
      elif attempt == 4:
1424
        # Other job was cancelled
1425
        depmgr.AddCheckResult(job, prev_job_id, None,
1426
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1427

  
1428
      if attempt == 0:
1429
        self.assertEqual(depmgr.CountPendingResults(), 0)
1430
      else:
1431
        self.assertEqual(depmgr.CountPendingResults(), 1)
1432

  
1433
      result = jqueue._JobProcessor(queue, opexec, job)()
1434
      if attempt <= 1 or attempt >= 4:
1435
        # Job should only be updated if there was an actual change
1436
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1437
      self.assertRaises(IndexError, queue.GetNextUpdate)
1438
      self.assertFalse(depmgr.CountPendingResults())
1439

  
1440
      if attempt > 0 and attempt < 4:
1441
        # Simulate waiting for other job
1442
        self.assertTrue(result)
1443
        self.assertTrue(job.cur_opctx)
1444
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1445
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1446
        self.assert_(job.start_timestamp)
1447
        self.assertFalse(job.end_timestamp)
1448
        continue
1449

  
1450
      if result:
1451
        # Last opcode
1452
        self.assertFalse(job.cur_opctx)
1453
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1454
        break
1455

  
1456
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1457

  
1458
      self.assertFalse(result)
1459
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1460
      self.assert_(job.start_timestamp)
1461
      self.assertFalse(job.end_timestamp)
1462

  
1463
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1464
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1465
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1466
                     [[constants.OP_STATUS_SUCCESS,
1467
                       constants.OP_STATUS_CANCELED,
1468
                       constants.OP_STATUS_CANCELED],
1469
                      ["Res0", "Job canceled by request",
1470
                       "Job canceled by request"]])
1471

  
1472
    self._GenericCheckJob(job)
1473

  
1474
    self.assertRaises(IndexError, queue.GetNextUpdate)
1475
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1476
    self.assertFalse(depmgr.CountPendingResults())
1477

  
1478
    # Calling the processor on a finished job should be a no-op
1479
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1480
    self.assertRaises(IndexError, queue.GetNextUpdate)
1481

  
1482
  def testJobDependencyWrongstatus(self):
1483
    depmgr = _FakeDependencyManager()
1484
    queue = _FakeQueueForProc(depmgr=depmgr)
1485

  
1486
    self.assertEqual(queue.depmgr, depmgr)
1487

  
1488
    prev_job_id = 9741
1489
    job_id = 11763
1490
    ops = [
1491
      opcodes.OpTestDummy(result="Res0", fail=False),
1492
      opcodes.OpTestDummy(result="Res1", fail=False,
1493
                          depends=[
1494
                            [prev_job_id, None],
1495
                            ]),
1496
      opcodes.OpTestDummy(result="Res2", fail=False),
1497
      ]
1498

  
1499
    # Create job
1500
    job = self._CreateJob(queue, job_id, ops)
1501

  
1502
    def _BeforeStart(timeout, priority):
1503
      if attempt == 0 or attempt > 5:
1504
        # Job should only be updated when it wasn't waiting for another job
1505
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1506
      self.assertRaises(IndexError, queue.GetNextUpdate)
1507
      self.assertFalse(queue.IsAcquired())
1508
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1509
      self.assertFalse(job.cur_opctx)
1510

  
1511
    def _AfterStart(op, cbs):
1512
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1513
      self.assertRaises(IndexError, queue.GetNextUpdate)
1514

  
1515
      self.assertFalse(queue.IsAcquired())
1516
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1517
      self.assertFalse(job.cur_opctx)
1518

  
1519
      # Job is running, cancelling shouldn't be possible
1520
      (success, _) = job.Cancel()
1521
      self.assertFalse(success)
1522

  
1523
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1524

  
1525
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1526

  
1527
    counter = itertools.count()
1528
    while True:
1529
      attempt = counter.next()
1530

  
1531
      self.assertRaises(IndexError, queue.GetNextUpdate)
1532
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1533

  
1534
      if attempt == 0:
1535
        # This will handle the first opcode
1536
        pass
1537
      elif attempt < 4:
1538
        depmgr.AddCheckResult(job, prev_job_id, None,
1539
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1540
      elif attempt == 4:
1541
        # Other job failed
1542
        depmgr.AddCheckResult(job, prev_job_id, None,
1543
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1544

  
1545
      if attempt == 0:
1546
        self.assertEqual(depmgr.CountPendingResults(), 0)
1547
      else:
1548
        self.assertEqual(depmgr.CountPendingResults(), 1)
1549

  
1550
      result = jqueue._JobProcessor(queue, opexec, job)()
1551
      if attempt <= 1 or attempt >= 4:
1552
        # Job should only be updated if there was an actual change
1553
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1554
      self.assertRaises(IndexError, queue.GetNextUpdate)
1555
      self.assertFalse(depmgr.CountPendingResults())
1556

  
1557
      if attempt > 0 and attempt < 4:
1558
        # Simulate waiting for other job
1559
        self.assertTrue(result)
1560
        self.assertTrue(job.cur_opctx)
1561
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1562
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1563
        self.assert_(job.start_timestamp)
1564
        self.assertFalse(job.end_timestamp)
1565
        continue
1566

  
1567
      if result:
1568
        # Last opcode
1569
        self.assertFalse(job.cur_opctx)
1570
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1571
        break
1572

  
1573
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1574

  
1575
      self.assertFalse(result)
1576
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1577
      self.assert_(job.start_timestamp)
1578
      self.assertFalse(job.end_timestamp)
1579

  
1580
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1581
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1582
    self.assertEqual(job.GetInfo(["opstatus"]),
1583
                     [[constants.OP_STATUS_SUCCESS,
1584
                       constants.OP_STATUS_ERROR,
1585
                       constants.OP_STATUS_ERROR]]),
1586

  
1587
    (opresult, ) = job.GetInfo(["opresult"])
1588
    self.assertEqual(len(opresult), len(ops))
1589
    self.assertEqual(opresult[0], "Res0")
1590
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1591
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1592

  
1593
    self._GenericCheckJob(job)
1594

  
1595
    self.assertRaises(IndexError, queue.GetNextUpdate)
1596
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1597
    self.assertFalse(depmgr.CountPendingResults())
1598

  
1599
    # Calling the processor on a finished job should be a no-op
1600
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1601
    self.assertRaises(IndexError, queue.GetNextUpdate)
1602

  
1182 1603

  
1183 1604
class _FakeTimeoutStrategy:
1184 1605
  def __init__(self, timeouts):
......
1412 1833
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1413 1834

  
1414 1835

  
1836
class TestJobDependencyManager(unittest.TestCase):
1837
  class _FakeJob:
1838
    def __init__(self, job_id):
1839
      self.id = str(job_id)
1840

  
1841
  def setUp(self):
1842
    self._status = []
1843
    self._queue = []
1844
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1845

  
1846
  def _GetStatus(self, job_id):
1847
    (exp_job_id, result) = self._status.pop(0)
1848
    self.assertEqual(exp_job_id, job_id)
1849
    return result
1850

  
1851
  def _Enqueue(self, jobs):
1852
    self._queue.append(jobs)
1853

  
1854
  def testNotFinalizedThenCancel(self):
1855
    job = self._FakeJob(17697)
1856
    job_id = str(28625)
1857

  
1858
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1859
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1860
    self.assertEqual(result, self.jdm.WAIT)
1861
    self.assertFalse(self._status)
1862
    self.assertFalse(self._queue)
1863
    self.assertTrue(self.jdm.JobWaiting(job))
1864
    self.assertEqual(self.jdm._waiters, {
1865
      job_id: set([job]),
1866
      })
1867

  
1868
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1869
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1870
    self.assertEqual(result, self.jdm.CANCEL)
1871
    self.assertFalse(self._status)
1872
    self.assertFalse(self._queue)
1873
    self.assertFalse(self.jdm.JobWaiting(job))
1874

  
1875
  def testRequireCancel(self):
1876
    job = self._FakeJob(5278)
1877
    job_id = str(9610)
1878
    dep_status = [constants.JOB_STATUS_CANCELED]
1879

  
1880
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1881
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1882
    self.assertEqual(result, self.jdm.WAIT)
1883
    self.assertFalse(self._status)
1884
    self.assertFalse(self._queue)
1885
    self.assertTrue(self.jdm.JobWaiting(job))
1886
    self.assertEqual(self.jdm._waiters, {
1887
      job_id: set([job]),
1888
      })
1889

  
1890
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1891
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1892
    self.assertEqual(result, self.jdm.CONTINUE)
1893
    self.assertFalse(self._status)
1894
    self.assertFalse(self._queue)
1895
    self.assertFalse(self.jdm.JobWaiting(job))
1896

  
1897
  def testRequireError(self):
1898
    job = self._FakeJob(21459)
1899
    job_id = str(25519)
1900
    dep_status = [constants.JOB_STATUS_ERROR]
1901

  
1902
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1903
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1904
    self.assertEqual(result, self.jdm.WAIT)
1905
    self.assertFalse(self._status)
1906
    self.assertFalse(self._queue)
1907
    self.assertTrue(self.jdm.JobWaiting(job))
1908
    self.assertEqual(self.jdm._waiters, {
1909
      job_id: set([job]),
1910
      })
1911

  
1912
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1913
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1914
    self.assertEqual(result, self.jdm.CONTINUE)
1915
    self.assertFalse(self._status)
1916
    self.assertFalse(self._queue)
1917
    self.assertFalse(self.jdm.JobWaiting(job))
1918

  
1919
  def testRequireMultiple(self):
1920
    dep_status = list(constants.JOBS_FINALIZED)
1921

  
1922
    for end_status in dep_status:
1923
      job = self._FakeJob(21343)
1924
      job_id = str(14609)
1925

  
1926
      self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1927
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1928
      self.assertEqual(result, self.jdm.WAIT)
1929
      self.assertFalse(self._status)
1930
      self.assertFalse(self._queue)
1931
      self.assertTrue(self.jdm.JobWaiting(job))
1932
      self.assertEqual(self.jdm._waiters, {
1933
        job_id: set([job]),
1934
        })
1935

  
1936
      self._status.append((job_id, end_status))
1937
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1938
      self.assertEqual(result, self.jdm.CONTINUE)
1939
      self.assertFalse(self._status)
1940
      self.assertFalse(self._queue)
1941
      self.assertFalse(self.jdm.JobWaiting(job))
1942

  
1943
  def testNotify(self):
1944
    job = self._FakeJob(8227)
1945
    job_id = str(4113)
1946

  
1947
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1948
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1949
    self.assertEqual(result, self.jdm.WAIT)
1950
    self.assertFalse(self._status)
1951
    self.assertFalse(self._queue)
1952
    self.assertTrue(self.jdm.JobWaiting(job))
1953
    self.assertEqual(self.jdm._waiters, {
1954
      job_id: set([job]),
1955
      })
1956

  
1957
    self.jdm.NotifyWaiters(job_id)
1958
    self.assertFalse(self._status)
1959
    self.assertFalse(self.jdm._waiters)
1960
    self.assertFalse(self.jdm.JobWaiting(job))
1961
    self.assertEqual(self._queue, [set([job])])
1962

  
1963
  def testWrongStatus(self):
1964
    job = self._FakeJob(10102)
1965
    job_id = str(1271)
1966

  
1967
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1968
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1969
                                            [constants.JOB_STATUS_SUCCESS])
1970
    self.assertEqual(result, self.jdm.WAIT)
1971
    self.assertFalse(self._status)
1972
    self.assertFalse(self._queue)
1973
    self.assertTrue(self.jdm.JobWaiting(job))
1974
    self.assertEqual(self.jdm._waiters, {
1975
      job_id: set([job]),
1976
      })
1977

  
1978
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1979
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1980
                                            [constants.JOB_STATUS_SUCCESS])
1981
    self.assertEqual(result, self.jdm.WRONGSTATUS)
1982
    self.assertFalse(self._status)
1983
    self.assertFalse(self._queue)
1984
    self.assertFalse(self.jdm.JobWaiting(job))
1985

  
1986
  def testCorrectStatus(self):
1987
    job = self._FakeJob(24273)
1988
    job_id = str(23885)
1989

  
1990
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1991
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1992
                                            [constants.JOB_STATUS_SUCCESS])
1993
    self.assertEqual(result, self.jdm.WAIT)
1994
    self.assertFalse(self._status)
1995
    self.assertFalse(self._queue)
1996
    self.assertTrue(self.jdm.JobWaiting(job))
1997
    self.assertEqual(self.jdm._waiters, {
1998
      job_id: set([job]),
1999
      })
2000

  
2001
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2002
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2003
                                            [constants.JOB_STATUS_SUCCESS])
2004
    self.assertEqual(result, self.jdm.CONTINUE)
2005
    self.assertFalse(self._status)
2006
    self.assertFalse(self._queue)
2007
    self.assertFalse(self.jdm.JobWaiting(job))
2008

  
2009
  def testFinalizedRightAway(self):
2010
    job = self._FakeJob(224)
2011
    job_id = str(3081)
2012

  
2013
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2014
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2015
                                            [constants.JOB_STATUS_SUCCESS])
2016
    self.assertEqual(result, self.jdm.CONTINUE)
2017
    self.assertFalse(self._status)
2018
    self.assertFalse(self._queue)
2019
    self.assertFalse(self.jdm.JobWaiting(job))
2020
    self.assertEqual(self.jdm._waiters, {
2021
      job_id: set(),
2022
      })
2023

  
2024
    # Force cleanup
2025
    self.jdm.NotifyWaiters("0")
2026
    self.assertFalse(self.jdm._waiters)
2027
    self.assertFalse(self._status)
2028
    self.assertFalse(self._queue)
2029

  
2030
  def testSelfDependency(self):
2031
    job = self._FakeJob(18937)
2032

  
2033
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2034
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2035
    self.assertEqual(result, self.jdm.ERROR)
2036

  
2037
  def testJobDisappears(self):
2038
    job = self._FakeJob(30540)
2039
    job_id = str(23769)
2040

  
2041
    def _FakeStatus(_):
2042
      raise errors.JobLost("#msg#")
2043

  
2044
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2045
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2046
    self.assertEqual(result, self.jdm.ERROR)
2047
    self.assertFalse(jdm.JobWaiting(job))
2048

  
2049

  
1415 2050
if __name__ == "__main__":
1416 2051
  testutils.GanetiTestProgram()

Also available in: Unified diff