X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/f0990e0cf0e78b5df61b9d4bb38a3fa7ea77ec5e..a8ee728bd6264178673583f6b3893491a827bef7:/test/ganeti.utils_unittest.py diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 4b081a0..d57e0c0 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -26,6 +26,7 @@ import os import time import tempfile import os.path +import os import md5 import socket import shutil @@ -89,6 +90,21 @@ class TestIsProcessAlive(unittest.TestCase): class TestLocking(unittest.TestCase): """Testing case for the Lock/Unlock functions""" + + def setUp(self): + lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.", + suffix=".locking") + self.old_lock_dir = constants.LOCK_DIR + constants.LOCK_DIR = lock_dir + + def tearDown(self): + try: + ganeti.utils.Unlock("unittest") + except LockError: + pass + shutil.rmtree(constants.LOCK_DIR, ignore_errors=True) + constants.LOCK_DIR = self.old_lock_dir + def clean_lock(self, name): try: ganeti.utils.Unlock("unittest") @@ -106,7 +122,6 @@ class TestLocking(unittest.TestCase): ganeti.utils.Lock("unittest") self.assertEqual(None, Unlock("unittest")) - def testDoubleLock(self): self.clean_lock("unittest") ganeti.utils.Lock("unittest") @@ -335,7 +350,8 @@ class TestParseUnit(unittest.TestCase): for sep in ('', ' ', ' ', "\t", "\t "): for suffix, scale in TestParseUnit.SCALES: for func in (lambda x: x, str.lower, str.upper): - self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale) + self.assertEqual(ParseUnit('1024' + sep + func(suffix)), + 1024 * scale) def testInvalidInput(self): for sep in ('-', '_', ',', 'a'): @@ -353,171 +369,147 @@ class TestSshKeys(GanetiTestCase): KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" ' 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b') - def writeTestFile(self): - (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test') - f = os.fdopen(fd, 'w') + def setUp(self): + (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test') try: - f.write(TestSshKeys.KEY_A) - f.write("\n") - f.write(TestSshKeys.KEY_B) - f.write("\n") - finally: - f.close() + handle = os.fdopen(fd, 'w') + try: + handle.write("%s\n" % TestSshKeys.KEY_A) + handle.write("%s\n" % TestSshKeys.KEY_B) + finally: + handle.close() + except: + utils.RemoveFile(self.tmpname) + raise - return tmpname + def tearDown(self): + utils.RemoveFile(self.tmpname) + del self.tmpname def testAddingNewKey(self): - tmpname = self.writeTestFile() - try: - AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') + AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') - self.assertFileContent(tmpname, - "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' - " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" - "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" + "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n") - def testAddingAlmostButNotCompletlyTheSameKey(self): - tmpname = self.writeTestFile() - try: - AddAuthorizedKey(tmpname, - 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test') - - self.assertFileContent(tmpname, - "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' - " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" - "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n") - finally: - os.unlink(tmpname) + def testAddingAlmostButNotCompletelyTheSameKey(self): + AddAuthorizedKey(self.tmpname, + 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test') + + self.assertFileContent(self.tmpname, + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n") def testAddingExistingKeyWithSomeMoreSpaces(self): - tmpname = self.writeTestFile() - try: - AddAuthorizedKey(tmpname, - 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') + AddAuthorizedKey(self.tmpname, + 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') - self.assertFileContent(tmpname, - "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' - " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") def testRemovingExistingKeyWithSomeMoreSpaces(self): - tmpname = self.writeTestFile() - try: - RemoveAuthorizedKey(tmpname, - 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') + RemoveAuthorizedKey(self.tmpname, + 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') - self.assertFileContent(tmpname, - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' - " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") def testRemovingNonExistingKey(self): - tmpname = self.writeTestFile() - try: - RemoveAuthorizedKey(tmpname, - 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test') + RemoveAuthorizedKey(self.tmpname, + 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test') - self.assertFileContent(tmpname, - "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' - " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") class TestEtcHosts(GanetiTestCase): """Test functions modifying /etc/hosts""" - def writeTestFile(self): - (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test') - f = os.fdopen(fd, 'w') + def setUp(self): + (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test') try: - f.write('# This is a test file for /etc/hosts\n') - f.write('127.0.0.1\tlocalhost\n') - f.write('192.168.1.1 router gw\n') - finally: - f.close() + handle = os.fdopen(fd, 'w') + try: + handle.write('# This is a test file for /etc/hosts\n') + handle.write('127.0.0.1\tlocalhost\n') + handle.write('192.168.1.1 router gw\n') + finally: + handle.close() + except: + utils.RemoveFile(self.tmpname) + raise - return tmpname + def tearDown(self): + utils.RemoveFile(self.tmpname) + del self.tmpname def testSettingNewIp(self): - tmpname = self.writeTestFile() - try: - SetEtcHostsEntry(tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost']) + SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost']) - self.assertFileContent(tmpname, - "# This is a test file for /etc/hosts\n" - "127.0.0.1\tlocalhost\n" - "192.168.1.1 router gw\n" - "1.2.3.4\tmyhost.domain.tld myhost\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 router gw\n" + "1.2.3.4\tmyhost.domain.tld myhost\n") def testSettingExistingIp(self): - tmpname = self.writeTestFile() - try: - SetEtcHostsEntry(tmpname, '192.168.1.1', 'myhost.domain.tld', ['myhost']) + SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld', + ['myhost']) - self.assertFileContent(tmpname, - "# This is a test file for /etc/hosts\n" - "127.0.0.1\tlocalhost\n" - "192.168.1.1\tmyhost.domain.tld myhost\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1\tmyhost.domain.tld myhost\n") + + def testSettingDuplicateName(self): + SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost']) + + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 router gw\n" + "1.2.3.4\tmyhost\n") def testRemovingExistingHost(self): - tmpname = self.writeTestFile() - try: - RemoveEtcHostsEntry(tmpname, 'router') + RemoveEtcHostsEntry(self.tmpname, 'router') - self.assertFileContent(tmpname, - "# This is a test file for /etc/hosts\n" - "127.0.0.1\tlocalhost\n" - "192.168.1.1 gw\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 gw\n") def testRemovingSingleExistingHost(self): - tmpname = self.writeTestFile() - try: - RemoveEtcHostsEntry(tmpname, 'localhost') + RemoveEtcHostsEntry(self.tmpname, 'localhost') - self.assertFileContent(tmpname, - "# This is a test file for /etc/hosts\n" - "192.168.1.1 router gw\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "192.168.1.1 router gw\n") def testRemovingNonExistingHost(self): - tmpname = self.writeTestFile() - try: - RemoveEtcHostsEntry(tmpname, 'myhost') + RemoveEtcHostsEntry(self.tmpname, 'myhost') - self.assertFileContent(tmpname, - "# This is a test file for /etc/hosts\n" - "127.0.0.1\tlocalhost\n" - "192.168.1.1 router gw\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 router gw\n") def testRemovingAlias(self): - tmpname = self.writeTestFile() - try: - RemoveEtcHostsEntry(tmpname, 'gw') + RemoveEtcHostsEntry(self.tmpname, 'gw') - self.assertFileContent(tmpname, - "# This is a test file for /etc/hosts\n" - "127.0.0.1\tlocalhost\n" - "192.168.1.1 router\n") - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 router\n") class TestShellQuoting(unittest.TestCase): @@ -552,12 +544,20 @@ class TestTcpPing(unittest.TestCase): def testTcpPingToLocalHostAccept(self): self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS, - constants.LOCALHOST_IP_ADDRESS, self.listenerport, timeout=10, - live_port_needed=True), + live_port_needed=True, + source=constants.LOCALHOST_IP_ADDRESS, + ), "failed to connect to test listener") + self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS, + self.listenerport, + timeout=10, + live_port_needed=True, + ), + "failed to connect to test listener (no source)") + class TestTcpPingDeaf(unittest.TestCase): """Testcase for TCP version of ping - against non listen(2)ing port""" @@ -573,20 +573,36 @@ class TestTcpPingDeaf(unittest.TestCase): def testTcpPingToLocalHostAcceptDeaf(self): self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS, - constants.LOCALHOST_IP_ADDRESS, self.deaflistenerport, timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=True), # need successful connect(2) + live_port_needed=True, + source=constants.LOCALHOST_IP_ADDRESS, + ), # need successful connect(2) "successfully connected to deaf listener") + self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS, + self.deaflistenerport, + timeout=constants.TCP_PING_TIMEOUT, + live_port_needed=True, + ), # need successful connect(2) + "successfully connected to deaf listener (no source addr)") + def testTcpPingToLocalHostNoAccept(self): self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS, - constants.LOCALHOST_IP_ADDRESS, self.deaflistenerport, timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=False), # ECONNREFUSED is OK + live_port_needed=False, + source=constants.LOCALHOST_IP_ADDRESS, + ), # ECONNREFUSED is OK "failed to ping alive host on deaf port") + self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS, + self.deaflistenerport, + timeout=constants.TCP_PING_TIMEOUT, + live_port_needed=False, + ), # ECONNREFUSED is OK + "failed to ping alive host on deaf port (no source addr)") + class TestListVisibleFiles(unittest.TestCase): """Test case for ListVisibleFiles""" @@ -640,5 +656,28 @@ class TestNewUUID(unittest.TestCase): self.failUnless(self._re_uuid.match(utils.NewUUID())) +class TestUniqueSequence(unittest.TestCase): + """Test case for UniqueSequence""" + + def _test(self, input, expected): + self.assertEqual(utils.UniqueSequence(input), expected) + + def runTest(self): + # Ordered input + self._test([1, 2, 3], [1, 2, 3]) + self._test([1, 1, 2, 2, 3, 3], [1, 2, 3]) + self._test([1, 2, 2, 3], [1, 2, 3]) + self._test([1, 2, 3, 3], [1, 2, 3]) + + # Unordered input + self._test([1, 2, 3, 1, 2, 3], [1, 2, 3]) + self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3]) + + # Strings + self._test(["a", "a"], ["a"]) + self._test(["a", "b"], ["a", "b"]) + self._test(["a", "b", "a"], ["a", "b"]) + + if __name__ == '__main__': unittest.main()