Revision f40ae421 test/ganeti.utils_unittest.py
b/test/ganeti.utils_unittest.py | ||
---|---|---|
363 | 363 |
self.failUnlessEqual(RunCmd(["env"], reset_env=True, |
364 | 364 |
env={"FOO": "bar",}).stdout.strip(), "FOO=bar") |
365 | 365 |
|
366 |
def testNoFork(self): |
|
367 |
"""Test that nofork raise an error""" |
|
368 |
assert not utils.no_fork |
|
369 |
utils.no_fork = True |
|
370 |
try: |
|
371 |
self.assertRaises(errors.ProgrammerError, RunCmd, ["true"]) |
|
372 |
finally: |
|
373 |
utils.no_fork = False |
|
374 |
|
|
375 |
def testWrongParams(self): |
|
376 |
"""Test wrong parameters""" |
|
377 |
self.assertRaises(errors.ProgrammerError, RunCmd, ["true"], |
|
378 |
output="/dev/null", interactive=True) |
|
379 |
|
|
366 | 380 |
|
367 |
class TestRunParts(unittest.TestCase):
|
|
381 |
class TestRunParts(testutils.GanetiTestCase):
|
|
368 | 382 |
"""Testing case for the RunParts function""" |
369 | 383 |
|
370 | 384 |
def setUp(self): |
... | ... | |
489 | 503 |
self.failUnlessEqual(runresult.exit_code, 0) |
490 | 504 |
self.failUnless(not runresult.failed) |
491 | 505 |
|
506 |
def testMissingDirectory(self): |
|
507 |
nosuchdir = utils.PathJoin(self.rundir, "no/such/directory") |
|
508 |
self.assertEqual(RunParts(nosuchdir), []) |
|
509 |
|
|
492 | 510 |
|
493 | 511 |
class TestStartDaemon(testutils.GanetiTestCase): |
494 | 512 |
def setUp(self): |
... | ... | |
681 | 699 |
self.fail("File '%s' not removed" % symlink) |
682 | 700 |
|
683 | 701 |
|
702 |
class TestRemoveDir(unittest.TestCase): |
|
703 |
def setUp(self): |
|
704 |
self.tmpdir = tempfile.mkdtemp() |
|
705 |
|
|
706 |
def tearDown(self): |
|
707 |
try: |
|
708 |
shutil.rmtree(self.tmpdir) |
|
709 |
except EnvironmentError: |
|
710 |
pass |
|
711 |
|
|
712 |
def testEmptyDir(self): |
|
713 |
utils.RemoveDir(self.tmpdir) |
|
714 |
self.assertFalse(os.path.isdir(self.tmpdir)) |
|
715 |
|
|
716 |
def testNonEmptyDir(self): |
|
717 |
self.tmpfile = os.path.join(self.tmpdir, "test1") |
|
718 |
open(self.tmpfile, "w").close() |
|
719 |
self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir) |
|
720 |
|
|
721 |
|
|
684 | 722 |
class TestRename(unittest.TestCase): |
685 | 723 |
"""Test case for RenameFile""" |
686 | 724 |
|
... | ... | |
882 | 920 |
datastrict = ReadOneLineFile(myfile, strict=True) |
883 | 921 |
self.assertEqual(myline, datastrict) |
884 | 922 |
|
923 |
def testEmptyfile(self): |
|
924 |
myfile = self._CreateTempFile() |
|
925 |
self.assertRaises(errors.GenericError, ReadOneLineFile, myfile) |
|
926 |
|
|
885 | 927 |
|
886 | 928 |
class TestTimestampForFilename(unittest.TestCase): |
887 | 929 |
def test(self): |
... | ... | |
973 | 1015 |
self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0') |
974 | 1016 |
self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1') |
975 | 1017 |
|
1018 |
def testErrors(self): |
|
1019 |
self.assertRaises(errors.ProgrammerError, FormatUnit, 1, "a") |
|
1020 |
|
|
976 | 1021 |
|
977 | 1022 |
class TestParseUnit(unittest.TestCase): |
978 | 1023 |
"""Test case for the ParseUnit function""" |
... | ... | |
1035 | 1080 |
self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5]) |
1036 | 1081 |
|
1037 | 1082 |
def testInvalidInput(self): |
1038 |
self.assertRaises(errors.ParseError, |
|
1039 |
utils.ParseCpuMask, |
|
1040 |
"garbage") |
|
1041 |
self.assertRaises(errors.ParseError, |
|
1042 |
utils.ParseCpuMask, |
|
1043 |
"0,") |
|
1044 |
self.assertRaises(errors.ParseError, |
|
1045 |
utils.ParseCpuMask, |
|
1046 |
"0-1-2") |
|
1047 |
self.assertRaises(errors.ParseError, |
|
1048 |
utils.ParseCpuMask, |
|
1049 |
"2-1") |
|
1083 |
for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]: |
|
1084 |
self.assertRaises(errors.ParseError, utils.ParseCpuMask, data) |
|
1085 |
|
|
1050 | 1086 |
|
1051 | 1087 |
class TestSshKeys(testutils.GanetiTestCase): |
1052 | 1088 |
"""Test case for the AddAuthorizedKey function""" |
... | ... | |
1523 | 1559 |
|
1524 | 1560 |
class TestForceDictType(unittest.TestCase): |
1525 | 1561 |
"""Test case for ForceDictType""" |
1526 |
|
|
1527 |
def setUp(self): |
|
1528 |
self.key_types = { |
|
1529 |
'a': constants.VTYPE_INT, |
|
1530 |
'b': constants.VTYPE_BOOL, |
|
1531 |
'c': constants.VTYPE_STRING, |
|
1532 |
'd': constants.VTYPE_SIZE, |
|
1533 |
"e": constants.VTYPE_MAYBE_STRING, |
|
1534 |
} |
|
1562 |
KEY_TYPES = { |
|
1563 |
"a": constants.VTYPE_INT, |
|
1564 |
"b": constants.VTYPE_BOOL, |
|
1565 |
"c": constants.VTYPE_STRING, |
|
1566 |
"d": constants.VTYPE_SIZE, |
|
1567 |
"e": constants.VTYPE_MAYBE_STRING, |
|
1568 |
} |
|
1535 | 1569 |
|
1536 | 1570 |
def _fdt(self, dict, allowed_values=None): |
1537 | 1571 |
if allowed_values is None: |
1538 |
utils.ForceDictType(dict, self.key_types)
|
|
1572 |
utils.ForceDictType(dict, self.KEY_TYPES)
|
|
1539 | 1573 |
else: |
1540 |
utils.ForceDictType(dict, self.key_types, allowed_values=allowed_values)
|
|
1574 |
utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
|
|
1541 | 1575 |
|
1542 | 1576 |
return dict |
1543 | 1577 |
|
... | ... | |
1550 | 1584 |
self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''}) |
1551 | 1585 |
self.assertEqual(self._fdt({'b': 'false'}), {'b': False}) |
1552 | 1586 |
self.assertEqual(self._fdt({'b': 'False'}), {'b': False}) |
1587 |
self.assertEqual(self._fdt({'b': False}), {'b': False}) |
|
1553 | 1588 |
self.assertEqual(self._fdt({'b': 'true'}), {'b': True}) |
1554 | 1589 |
self.assertEqual(self._fdt({'b': 'True'}), {'b': True}) |
1555 | 1590 |
self.assertEqual(self._fdt({'d': '4'}), {'d': 4}) |
... | ... | |
1557 | 1592 |
self.assertEqual(self._fdt({"e": None, }), {"e": None, }) |
1558 | 1593 |
self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", }) |
1559 | 1594 |
self.assertEqual(self._fdt({"e": False, }), {"e": '', }) |
1595 |
self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"}) |
|
1560 | 1596 |
|
1561 | 1597 |
def testErrors(self): |
1562 | 1598 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'}) |
1599 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"}) |
|
1563 | 1600 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True}) |
1564 | 1601 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'}) |
1565 | 1602 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'}) |
1566 | 1603 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), }) |
1567 | 1604 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], }) |
1605 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, }) |
|
1606 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, []) |
|
1607 |
self.assertRaises(errors.ProgrammerError, utils.ForceDictType, |
|
1608 |
{"b": "hello"}, {"b": "no-such-type"}) |
|
1568 | 1609 |
|
1569 | 1610 |
|
1570 | 1611 |
class TestIsNormAbsPath(unittest.TestCase): |
... | ... | |
1659 | 1700 |
utils.RunInSeparateProcess, _exc) |
1660 | 1701 |
|
1661 | 1702 |
|
1662 |
class TestFingerprintFile(unittest.TestCase): |
|
1703 |
class TestFingerprintFiles(unittest.TestCase):
|
|
1663 | 1704 |
def setUp(self): |
1664 | 1705 |
self.tmpfile = tempfile.NamedTemporaryFile() |
1706 |
self.tmpfile2 = tempfile.NamedTemporaryFile() |
|
1707 |
utils.WriteFile(self.tmpfile2.name, data="Hello World\n") |
|
1708 |
self.results = { |
|
1709 |
self.tmpfile.name: "da39a3ee5e6b4b0d3255bfef95601890afd80709", |
|
1710 |
self.tmpfile2.name: "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a", |
|
1711 |
} |
|
1665 | 1712 |
|
1666 |
def test(self): |
|
1713 |
def testSingleFile(self):
|
|
1667 | 1714 |
self.assertEqual(utils._FingerprintFile(self.tmpfile.name), |
1668 |
"da39a3ee5e6b4b0d3255bfef95601890afd80709") |
|
1715 |
self.results[self.tmpfile.name]) |
|
1716 |
|
|
1717 |
self.assertEqual(utils._FingerprintFile("/no/such/file"), None) |
|
1669 | 1718 |
|
1670 |
utils.WriteFile(self.tmpfile.name, data="Hello World\n") |
|
1719 |
def testBigFile(self): |
|
1720 |
self.tmpfile.write("A" * 8192) |
|
1721 |
self.tmpfile.flush() |
|
1671 | 1722 |
self.assertEqual(utils._FingerprintFile(self.tmpfile.name), |
1672 |
"648a6a6ffffdaa0badb23b8baf90b6168dd16b3a") |
|
1723 |
"35b6795ca20d6dc0aff8c7c110c96cd1070b8c38") |
|
1724 |
|
|
1725 |
def testMultiple(self): |
|
1726 |
all_files = self.results.keys() |
|
1727 |
all_files.append("/no/such/file") |
|
1728 |
self.assertEqual(utils.FingerprintFiles(self.results.keys()), self.results) |
|
1673 | 1729 |
|
1674 | 1730 |
|
1675 | 1731 |
class TestUnescapeAndSplit(unittest.TestCase): |
... | ... | |
2414 | 2470 |
# this doesn't raise, since we passed None |
2415 | 2471 |
utils.SafeWriteFile(name, None, data="") |
2416 | 2472 |
|
2473 |
def testError(self): |
|
2474 |
t = tempfile.NamedTemporaryFile() |
|
2475 |
self.assertRaises(errors.ProgrammerError, utils.GetFileID, |
|
2476 |
path=t.name, fd=t.fileno()) |
|
2477 |
|
|
2417 | 2478 |
|
2418 | 2479 |
class TimeMock: |
2419 | 2480 |
def __init__(self, values): |
... | ... | |
2451 | 2512 |
self.assertRaises(ValueError, utils.RunningTimeout, -1.0, True) |
2452 | 2513 |
|
2453 | 2514 |
|
2515 |
class TestTryConvert(unittest.TestCase): |
|
2516 |
def test(self): |
|
2517 |
for src, fn, result in [ |
|
2518 |
("1", int, 1), |
|
2519 |
("a", int, "a"), |
|
2520 |
("", bool, False), |
|
2521 |
("a", bool, True), |
|
2522 |
]: |
|
2523 |
self.assertEqual(utils.TryConvert(fn, src), result) |
|
2524 |
|
|
2525 |
|
|
2526 |
class TestIsValidShellParam(unittest.TestCase): |
|
2527 |
def test(self): |
|
2528 |
for val, result in [ |
|
2529 |
("abc", True), |
|
2530 |
("ab;cd", False), |
|
2531 |
]: |
|
2532 |
self.assertEqual(utils.IsValidShellParam(val), result) |
|
2533 |
|
|
2534 |
|
|
2535 |
class TestBuildShellCmd(unittest.TestCase): |
|
2536 |
def test(self): |
|
2537 |
self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd, |
|
2538 |
"ls %s", "ab;cd") |
|
2539 |
self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab") |
|
2540 |
|
|
2541 |
|
|
2542 |
class TestWriteFile(unittest.TestCase): |
|
2543 |
def setUp(self): |
|
2544 |
self.tfile = tempfile.NamedTemporaryFile() |
|
2545 |
self.did_pre = False |
|
2546 |
self.did_post = False |
|
2547 |
self.did_write = False |
|
2548 |
|
|
2549 |
def markPre(self, fd): |
|
2550 |
self.did_pre = True |
|
2551 |
|
|
2552 |
def markPost(self, fd): |
|
2553 |
self.did_post = True |
|
2554 |
|
|
2555 |
def markWrite(self, fd): |
|
2556 |
self.did_write = True |
|
2557 |
|
|
2558 |
def testWrite(self): |
|
2559 |
data = "abc" |
|
2560 |
utils.WriteFile(self.tfile.name, data=data) |
|
2561 |
self.assertEqual(utils.ReadFile(self.tfile.name), data) |
|
2562 |
|
|
2563 |
def testErrors(self): |
|
2564 |
self.assertRaises(errors.ProgrammerError, utils.WriteFile, |
|
2565 |
self.tfile.name, data="test", fn=lambda fd: None) |
|
2566 |
self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name) |
|
2567 |
self.assertRaises(errors.ProgrammerError, utils.WriteFile, |
|
2568 |
self.tfile.name, data="test", atime=0) |
|
2569 |
|
|
2570 |
def testCalls(self): |
|
2571 |
utils.WriteFile(self.tfile.name, fn=self.markWrite, |
|
2572 |
prewrite=self.markPre, postwrite=self.markPost) |
|
2573 |
self.assertTrue(self.did_pre) |
|
2574 |
self.assertTrue(self.did_post) |
|
2575 |
self.assertTrue(self.did_write) |
|
2576 |
|
|
2577 |
def testDryRun(self): |
|
2578 |
orig = "abc" |
|
2579 |
self.tfile.write(orig) |
|
2580 |
self.tfile.flush() |
|
2581 |
utils.WriteFile(self.tfile.name, data="hello", dry_run=True) |
|
2582 |
self.assertEqual(utils.ReadFile(self.tfile.name), orig) |
|
2583 |
|
|
2584 |
def testTimes(self): |
|
2585 |
f = self.tfile.name |
|
2586 |
for at, mt in [(0, 0), (1000, 1000), (2000, 3000), |
|
2587 |
(int(time.time()), 5000)]: |
|
2588 |
utils.WriteFile(f, data="hello", atime=at, mtime=mt) |
|
2589 |
st = os.stat(f) |
|
2590 |
self.assertEqual(st.st_atime, at) |
|
2591 |
self.assertEqual(st.st_mtime, mt) |
|
2592 |
|
|
2593 |
|
|
2594 |
def testNoClose(self): |
|
2595 |
data = "hello" |
|
2596 |
self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None) |
|
2597 |
fd = utils.WriteFile(self.tfile.name, data=data, close=False) |
|
2598 |
try: |
|
2599 |
os.lseek(fd, 0, 0) |
|
2600 |
self.assertEqual(os.read(fd, 4096), data) |
|
2601 |
finally: |
|
2602 |
os.close(fd) |
|
2603 |
|
|
2604 |
|
|
2605 |
class TestNormalizeAndValidateMac(unittest.TestCase): |
|
2606 |
def testInvalid(self): |
|
2607 |
self.assertRaises(errors.OpPrereqError, |
|
2608 |
utils.NormalizeAndValidateMac, "xxx") |
|
2609 |
|
|
2610 |
def testNormalization(self): |
|
2611 |
for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]: |
|
2612 |
self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower()) |
|
2613 |
|
|
2614 |
|
|
2454 | 2615 |
if __name__ == '__main__': |
2455 | 2616 |
testutils.GanetiTestProgram() |
Also available in: Unified diff