X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/2632795d94acd2703a420350e28cf30a26321cf5..2e6469a14ae2b2c7b547076efe3fdc8eacae6050:/test/ganeti.utils_unittest.py diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 6d489b2..df5ef20 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,43 +21,34 @@ """Script for unittesting the utils module""" -import unittest +import distutils.version +import errno +import fcntl +import glob import os -import time -import tempfile import os.path -import os -import stat +import re +import shutil import signal import socket -import shutil -import re -import select +import stat import string -import fcntl -import OpenSSL +import tempfile +import time +import unittest import warnings -import distutils.version -import glob -import errno +import OpenSSL +from cStringIO import StringIO -import ganeti import testutils from ganeti import constants from ganeti import compat from ganeti import utils from ganeti import errors -from ganeti import serializer -from ganeti.utils import IsProcessAlive, RunCmd, \ - RemoveFile, MatchNameComponent, FormatUnit, \ - ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \ - ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \ - SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \ - TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \ - UnescapeAndSplit, RunParts, PathJoin, HostInfo, ReadOneLineFile - -from ganeti.errors import LockError, UnitParseError, GenericError, \ - ProgrammerError, OpPrereqError +from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \ + ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \ + TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \ + ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry class TestIsProcessAlive(unittest.TestCase): @@ -65,8 +56,7 @@ class TestIsProcessAlive(unittest.TestCase): def testExists(self): mypid = os.getpid() - self.assert_(IsProcessAlive(mypid), - "can't find myself running") + self.assert_(utils.IsProcessAlive(mypid), "can't find myself running") def testNotExisting(self): pid_non_existing = os.fork() @@ -75,7 +65,7 @@ class TestIsProcessAlive(unittest.TestCase): elif pid_non_existing < 0: raise SystemError("can't fork") os.waitpid(pid_non_existing, 0) - self.assertFalse(IsProcessAlive(pid_non_existing), + self.assertFalse(utils.IsProcessAlive(pid_non_existing), "nonexisting process detected") @@ -121,7 +111,7 @@ class TestIsProcessHandlingSignal(unittest.TestCase): self.assertEqual(result, value.strip()) def test(self): - sp = utils.PathJoin(self.tmpdir, "status") + sp = PathJoin(self.tmpdir, "status") utils.WriteFile(sp, data="\n".join([ "Name: bash", @@ -139,7 +129,7 @@ class TestIsProcessHandlingSignal(unittest.TestCase): self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp)) def testNoSigCgt(self): - sp = utils.PathJoin(self.tmpdir, "status") + sp = PathJoin(self.tmpdir, "status") utils.WriteFile(sp, data="\n".join([ "Name: bash", @@ -149,7 +139,7 @@ class TestIsProcessHandlingSignal(unittest.TestCase): 1234, 10, status_path=sp) def testNoSuchFile(self): - sp = utils.PathJoin(self.tmpdir, "notexist") + sp = PathJoin(self.tmpdir, "notexist") self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp)) @@ -193,7 +183,7 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, os.getpid()) self.failUnless(utils.IsProcessAlive(read_pid)) - self.failUnlessRaises(GenericError, utils.WritePidFile, 'test') + self.failUnlessRaises(errors.GenericError, utils.WritePidFile, 'test') utils.RemovePidFile('test') self.failIf(os.path.exists(pid_file), "PID file should not exist anymore") @@ -227,7 +217,7 @@ class TestPidFileFunctions(unittest.TestCase): utils.KillProcess(new_pid, waitpid=True) self.failIf(utils.IsProcessAlive(new_pid)) utils.RemovePidFile('child') - self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0) + self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0) def tearDown(self): for name in os.listdir(self.dir): @@ -881,7 +871,7 @@ class TestCreateBackup(testutils.GanetiTestCase): shutil.rmtree(self.tmpdir) def testEmpty(self): - filename = utils.PathJoin(self.tmpdir, "config.data") + filename = PathJoin(self.tmpdir, "config.data") utils.WriteFile(filename, data="") bname = utils.CreateBackup(filename) self.assertFileContent(bname, "") @@ -891,7 +881,7 @@ class TestCreateBackup(testutils.GanetiTestCase): utils.CreateBackup(filename) self.assertEqual(len(glob.glob("%s*" % filename)), 4) - fifoname = utils.PathJoin(self.tmpdir, "fifo") + fifoname = PathJoin(self.tmpdir, "fifo") os.mkfifo(fifoname) self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname) @@ -901,7 +891,7 @@ class TestCreateBackup(testutils.GanetiTestCase): for rep in [1, 2, 10, 127]: testdata = data * rep - filename = utils.PathJoin(self.tmpdir, "test.data_") + filename = PathJoin(self.tmpdir, "test.data_") utils.WriteFile(filename, data=testdata) self.assertFileContent(filename, testdata) @@ -953,6 +943,7 @@ class TestFormatUnit(unittest.TestCase): self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0') self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1') + class TestParseUnit(unittest.TestCase): """Test case for the ParseUnit function""" @@ -999,17 +990,39 @@ class TestParseUnit(unittest.TestCase): def testInvalidInput(self): for sep in ('-', '_', ',', 'a'): for suffix, _ in TestParseUnit.SCALES: - self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix) + self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix) for suffix, _ in TestParseUnit.SCALES: - self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix) + self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix) + + +class TestParseCpuMask(unittest.TestCase): + """Test case for the ParseCpuMask function.""" + def testWellFormed(self): + self.assertEqual(utils.ParseCpuMask(""), []) + self.assertEqual(utils.ParseCpuMask("1"), [1]) + self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5]) + + def testInvalidInput(self): + self.assertRaises(errors.ParseError, + utils.ParseCpuMask, + "garbage") + self.assertRaises(errors.ParseError, + utils.ParseCpuMask, + "0,") + self.assertRaises(errors.ParseError, + utils.ParseCpuMask, + "0-1-2") + self.assertRaises(errors.ParseError, + utils.ParseCpuMask, + "2-1") class TestSshKeys(testutils.GanetiTestCase): """Test case for the AddAuthorizedKey function""" KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a' - KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" ' + KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" ' 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b') def setUp(self): @@ -1023,48 +1036,49 @@ class TestSshKeys(testutils.GanetiTestCase): handle.close() def testAddingNewKey(self): - AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') + utils.AddAuthorizedKey(self.tmpname, + 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') self.assertFileContent(self.tmpname, "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"' " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n") def testAddingAlmostButNotCompletelyTheSameKey(self): - AddAuthorizedKey(self.tmpname, + utils.AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test') self.assertFileContent(self.tmpname, "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"' " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n") def testAddingExistingKeyWithSomeMoreSpaces(self): - AddAuthorizedKey(self.tmpname, + utils.AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') self.assertFileContent(self.tmpname, "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"' " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") def testRemovingExistingKeyWithSomeMoreSpaces(self): - RemoveAuthorizedKey(self.tmpname, + utils.RemoveAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') self.assertFileContent(self.tmpname, - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"' " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") def testRemovingNonExistingKey(self): - RemoveAuthorizedKey(self.tmpname, + utils.RemoveAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test') self.assertFileContent(self.tmpname, "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" - 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"' " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") @@ -1078,38 +1092,39 @@ class TestEtcHosts(testutils.GanetiTestCase): try: handle.write('# This is a test file for /etc/hosts\n') handle.write('127.0.0.1\tlocalhost\n') - handle.write('192.168.1.1 router gw\n') + handle.write('192.0.2.1 router gw\n') finally: handle.close() def testSettingNewIp(self): - SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost']) + SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com', + ['myhost']) self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" - "192.168.1.1 router gw\n" - "1.2.3.4\tmyhost.domain.tld myhost\n") + "192.0.2.1 router gw\n" + "198.51.100.4\tmyhost.example.com myhost\n") self.assertFileMode(self.tmpname, 0644) def testSettingExistingIp(self): - SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld', + SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com', ['myhost']) self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" - "192.168.1.1\tmyhost.domain.tld myhost\n") + "192.0.2.1\tmyhost.example.com myhost\n") self.assertFileMode(self.tmpname, 0644) def testSettingDuplicateName(self): - SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost']) + SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost']) self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" - "192.168.1.1 router gw\n" - "1.2.3.4\tmyhost\n") + "192.0.2.1 router gw\n" + "198.51.100.4\tmyhost\n") self.assertFileMode(self.tmpname, 0644) def testRemovingExistingHost(self): @@ -1118,7 +1133,7 @@ class TestEtcHosts(testutils.GanetiTestCase): self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" - "192.168.1.1 gw\n") + "192.0.2.1 gw\n") self.assertFileMode(self.tmpname, 0644) def testRemovingSingleExistingHost(self): @@ -1126,7 +1141,7 @@ class TestEtcHosts(testutils.GanetiTestCase): self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" - "192.168.1.1 router gw\n") + "192.0.2.1 router gw\n") self.assertFileMode(self.tmpname, 0644) def testRemovingNonExistingHost(self): @@ -1135,7 +1150,7 @@ class TestEtcHosts(testutils.GanetiTestCase): self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" - "192.168.1.1 router gw\n") + "192.0.2.1 router gw\n") self.assertFileMode(self.tmpname, 0644) def testRemovingAlias(self): @@ -1144,10 +1159,31 @@ class TestEtcHosts(testutils.GanetiTestCase): self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" - "192.168.1.1 router\n") + "192.0.2.1 router\n") self.assertFileMode(self.tmpname, 0644) +class TestGetMounts(unittest.TestCase): + """Test case for GetMounts().""" + + TESTDATA = ( + "rootfs / rootfs rw 0 0\n" + "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n" + "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n") + + def setUp(self): + self.tmpfile = tempfile.NamedTemporaryFile() + utils.WriteFile(self.tmpfile.name, data=self.TESTDATA) + + def testGetMounts(self): + self.assertEqual(utils.GetMounts(filename=self.tmpfile.name), + [ + ("rootfs", "/", "rootfs", "rw"), + ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"), + ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"), + ]) + + class TestShellQuoting(unittest.TestCase): """Test case for shell quoting functions""" @@ -1164,167 +1200,6 @@ class TestShellQuoting(unittest.TestCase): self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c") -class TestTcpPing(unittest.TestCase): - """Testcase for TCP version of ping - against listen(2)ing port""" - - def setUp(self): - self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.listener.bind((constants.IP4_ADDRESS_LOCALHOST, 0)) - self.listenerport = self.listener.getsockname()[1] - self.listener.listen(1) - - def tearDown(self): - self.listener.shutdown(socket.SHUT_RDWR) - del self.listener - del self.listenerport - - def testTcpPingToLocalHostAccept(self): - self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST, - self.listenerport, - timeout=10, - live_port_needed=True, - source=constants.IP4_ADDRESS_LOCALHOST, - ), - "failed to connect to test listener") - - self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST, - self.listenerport, - timeout=10, - live_port_needed=True, - ), - "failed to connect to test listener (no source)") - - -class TestTcpPingDeaf(unittest.TestCase): - """Testcase for TCP version of ping - against non listen(2)ing port""" - - def setUp(self): - self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.deaflistener.bind((constants.IP4_ADDRESS_LOCALHOST, 0)) - self.deaflistenerport = self.deaflistener.getsockname()[1] - - def tearDown(self): - del self.deaflistener - del self.deaflistenerport - - def testTcpPingToLocalHostAcceptDeaf(self): - self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST, - self.deaflistenerport, - timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=True, - source=constants.IP4_ADDRESS_LOCALHOST, - ), # need successful connect(2) - "successfully connected to deaf listener") - - self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST, - self.deaflistenerport, - timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=True, - ), # need successful connect(2) - "successfully connected to deaf listener (no source addr)") - - def testTcpPingToLocalHostNoAccept(self): - self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST, - self.deaflistenerport, - timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=False, - source=constants.IP4_ADDRESS_LOCALHOST, - ), # ECONNREFUSED is OK - "failed to ping alive host on deaf port") - - self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST, - self.deaflistenerport, - timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=False, - ), # ECONNREFUSED is OK - "failed to ping alive host on deaf port (no source addr)") - - -class TestOwnIpAddress(unittest.TestCase): - """Testcase for OwnIpAddress""" - - def testOwnLoopback(self): - """check having the loopback ip""" - self.failUnless(OwnIpAddress(constants.IP4_ADDRESS_LOCALHOST), - "Should own the loopback address") - - def testNowOwnAddress(self): - """check that I don't own an address""" - - # Network 192.0.2.0/24 is reserved for test/documentation as per - # RFC 5735, so we *should* not have an address of this range... if - # this fails, we should extend the test to multiple addresses - DST_IP = "192.0.2.1" - self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP) - - -def _GetSocketCredentials(path): - """Connect to a Unix socket and return remote credentials. - - """ - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - sock.settimeout(10) - sock.connect(path) - return utils.GetSocketCredentials(sock) - finally: - sock.close() - - -class TestGetSocketCredentials(unittest.TestCase): - def setUp(self): - self.tmpdir = tempfile.mkdtemp() - self.sockpath = utils.PathJoin(self.tmpdir, "sock") - - self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.listener.settimeout(10) - self.listener.bind(self.sockpath) - self.listener.listen(1) - - def tearDown(self): - self.listener.shutdown(socket.SHUT_RDWR) - self.listener.close() - shutil.rmtree(self.tmpdir) - - def test(self): - (c2pr, c2pw) = os.pipe() - - # Start child process - child = os.fork() - if child == 0: - try: - data = serializer.DumpJson(_GetSocketCredentials(self.sockpath)) - - os.write(c2pw, data) - os.close(c2pw) - - os._exit(0) - finally: - os._exit(1) - - os.close(c2pw) - - # Wait for one connection - (conn, _) = self.listener.accept() - conn.recv(1) - conn.close() - - # Wait for result - result = os.read(c2pr, 4096) - os.close(c2pr) - - # Check child's exit code - (_, status) = os.waitpid(child, 0) - self.assertFalse(os.WIFSIGNALED(status)) - self.assertEqual(os.WEXITSTATUS(status), 0) - - # Check result - (pid, uid, gid) = serializer.LoadJson(result) - self.assertEqual(pid, os.getpid()) - self.assertEqual(uid, os.getuid()) - self.assertEqual(gid, os.getgid()) - - class TestListVisibleFiles(unittest.TestCase): """Test case for ListVisibleFiles""" @@ -1628,13 +1503,14 @@ class TestForceDictType(unittest.TestCase): 'b': constants.VTYPE_BOOL, 'c': constants.VTYPE_STRING, 'd': constants.VTYPE_SIZE, + "e": constants.VTYPE_MAYBE_STRING, } def _fdt(self, dict, allowed_values=None): if allowed_values is None: - ForceDictType(dict, self.key_types) + utils.ForceDictType(dict, self.key_types) else: - ForceDictType(dict, self.key_types, allowed_values=allowed_values) + utils.ForceDictType(dict, self.key_types, allowed_values=allowed_values) return dict @@ -1651,12 +1527,17 @@ class TestForceDictType(unittest.TestCase): self.assertEqual(self._fdt({'b': 'True'}), {'b': True}) self.assertEqual(self._fdt({'d': '4'}), {'d': 4}) self.assertEqual(self._fdt({'d': '4M'}), {'d': 4}) + self.assertEqual(self._fdt({"e": None, }), {"e": None, }) + self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", }) + self.assertEqual(self._fdt({"e": False, }), {"e": '', }) def testErrors(self): self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'}) self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True}) self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'}) self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), }) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], }) class TestIsNormAbsPath(unittest.TestCase): @@ -1664,10 +1545,10 @@ class TestIsNormAbsPath(unittest.TestCase): def _pathTestHelper(self, path, result): if result: - self.assert_(IsNormAbsPath(path), + self.assert_(utils.IsNormAbsPath(path), "Path %s should result absolute and normalized" % path) else: - self.assertFalse(IsNormAbsPath(path), + self.assertFalse(utils.IsNormAbsPath(path), "Path %s should not result absolute and normalized" % path) def testBase(self): @@ -1859,42 +1740,6 @@ class TestPathJoin(unittest.TestCase): self.failUnlessRaises(ValueError, PathJoin, "/a", "/b") -class TestHostInfo(unittest.TestCase): - """Testing case for HostInfo""" - - def testUppercase(self): - data = "AbC.example.com" - self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower()) - - def testTooLongName(self): - data = "a.b." + "c" * 255 - self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data) - - def testTrailingDot(self): - data = "a.b.c" - self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data) - - def testInvalidName(self): - data = [ - "a b", - "a/b", - ".a.b", - "a..b", - ] - for value in data: - self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value) - - def testValidName(self): - data = [ - "a.b", - "a-b", - "a_b", - "a.b.c", - ] - for value in data: - HostInfo.NormalizeName(value) - - class TestValidateServiceName(unittest.TestCase): def testValid(self): testnames = [ @@ -1918,7 +1763,7 @@ class TestValidateServiceName(unittest.TestCase): ] for name in testnames: - self.assertRaises(OpPrereqError, utils.ValidateServiceName, name) + self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name) class TestParseAsn1Generalizedtime(unittest.TestCase): @@ -2047,25 +1892,25 @@ class TestMakedirs(unittest.TestCase): shutil.rmtree(self.tmpdir) def testNonExisting(self): - path = utils.PathJoin(self.tmpdir, "foo") + path = PathJoin(self.tmpdir, "foo") utils.Makedirs(path) self.assert_(os.path.isdir(path)) def testExisting(self): - path = utils.PathJoin(self.tmpdir, "foo") + path = PathJoin(self.tmpdir, "foo") os.mkdir(path) utils.Makedirs(path) self.assert_(os.path.isdir(path)) def testRecursiveNonExisting(self): - path = utils.PathJoin(self.tmpdir, "foo/bar/baz") + path = PathJoin(self.tmpdir, "foo/bar/baz") utils.Makedirs(path) self.assert_(os.path.isdir(path)) def testRecursiveExisting(self): - path = utils.PathJoin(self.tmpdir, "B/moo/xyz") + path = PathJoin(self.tmpdir, "B/moo/xyz") self.assertFalse(os.path.exists(path)) - os.mkdir(utils.PathJoin(self.tmpdir, "B")) + os.mkdir(PathJoin(self.tmpdir, "B")) utils.Makedirs(path) self.assert_(os.path.isdir(path)) @@ -2195,16 +2040,16 @@ class TestReadLockedPidFile(unittest.TestCase): shutil.rmtree(self.tmpdir) def testNonExistent(self): - path = utils.PathJoin(self.tmpdir, "nonexist") + path = PathJoin(self.tmpdir, "nonexist") self.assert_(utils.ReadLockedPidFile(path) is None) def testUnlocked(self): - path = utils.PathJoin(self.tmpdir, "pid") + path = PathJoin(self.tmpdir, "pid") utils.WriteFile(path, data="123") self.assert_(utils.ReadLockedPidFile(path) is None) def testLocked(self): - path = utils.PathJoin(self.tmpdir, "pid") + path = PathJoin(self.tmpdir, "pid") utils.WriteFile(path, data="123") fl = utils.FileLock.Open(path) @@ -2218,8 +2063,8 @@ class TestReadLockedPidFile(unittest.TestCase): self.assert_(utils.ReadLockedPidFile(path) is None) def testError(self): - path = utils.PathJoin(self.tmpdir, "foobar", "pid") - utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="") + path = PathJoin(self.tmpdir, "foobar", "pid") + utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="") # open(2) should return ENOTDIR self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path) @@ -2370,15 +2215,15 @@ class TestEnsureDirs(unittest.TestCase): def testEnsureDirs(self): utils.EnsureDirs([ - (utils.PathJoin(self.dir, "foo"), 0777), - (utils.PathJoin(self.dir, "bar"), 0000), + (PathJoin(self.dir, "foo"), 0777), + (PathJoin(self.dir, "bar"), 0000), ]) - self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777) - self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000) + self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777) + self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000) def tearDown(self): - os.rmdir(utils.PathJoin(self.dir, "foo")) - os.rmdir(utils.PathJoin(self.dir, "bar")) + os.rmdir(PathJoin(self.dir, "foo")) + os.rmdir(PathJoin(self.dir, "bar")) os.rmdir(self.dir) os.umask(self.old_umask) @@ -2404,7 +2249,7 @@ class TestFormatSeconds(unittest.TestCase): self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s") -class RunIgnoreProcessNotFound(unittest.TestCase): +class TestIgnoreProcessNotFound(unittest.TestCase): @staticmethod def _WritePid(fd): os.write(fd, str(os.getpid())) @@ -2426,52 +2271,47 @@ class RunIgnoreProcessNotFound(unittest.TestCase): self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0)) -class TestIsValidIP4(unittest.TestCase): +class TestShellWriter(unittest.TestCase): def test(self): - self.assert_(utils.IsValidIP4("127.0.0.1")) - self.assert_(utils.IsValidIP4("0.0.0.0")) - self.assert_(utils.IsValidIP4("255.255.255.255")) - self.assertFalse(utils.IsValidIP4("0")) - self.assertFalse(utils.IsValidIP4("1")) - self.assertFalse(utils.IsValidIP4("1.1.1")) - self.assertFalse(utils.IsValidIP4("255.255.255.256")) - self.assertFalse(utils.IsValidIP4("::1")) + buf = StringIO() + sw = utils.ShellWriter(buf) + sw.Write("#!/bin/bash") + sw.Write("if true; then") + sw.IncIndent() + try: + sw.Write("echo true") + sw.Write("for i in 1 2 3") + sw.Write("do") + sw.IncIndent() + try: + self.assertEqual(sw._indent, 2) + sw.Write("date") + finally: + sw.DecIndent() + sw.Write("done") + finally: + sw.DecIndent() + sw.Write("echo %s", utils.ShellQuote("Hello World")) + sw.Write("exit 0") -class TestIsValidIP6(unittest.TestCase): - def test(self): - self.assert_(utils.IsValidIP6("::")) - self.assert_(utils.IsValidIP6("::1")) - self.assert_(utils.IsValidIP6("1" + (":1" * 7))) - self.assert_(utils.IsValidIP6("ffff" + (":ffff" * 7))) - self.assertFalse(utils.IsValidIP6("0")) - self.assertFalse(utils.IsValidIP6(":1")) - self.assertFalse(utils.IsValidIP6("f" + (":f" * 6))) - self.assertFalse(utils.IsValidIP6("fffg" + (":ffff" * 7))) - self.assertFalse(utils.IsValidIP6("fffff" + (":ffff" * 7))) - self.assertFalse(utils.IsValidIP6("1" + (":1" * 8))) - self.assertFalse(utils.IsValidIP6("127.0.0.1")) - - -class TestIsValidIP(unittest.TestCase): - def test(self): - self.assert_(utils.IsValidIP("0.0.0.0")) - self.assert_(utils.IsValidIP("127.0.0.1")) - self.assert_(utils.IsValidIP("::")) - self.assert_(utils.IsValidIP("::1")) - self.assertFalse(utils.IsValidIP("0")) - self.assertFalse(utils.IsValidIP("1.1.1.256")) - self.assertFalse(utils.IsValidIP("a:g::1")) + self.assertEqual(sw._indent, 0) + output = buf.getvalue() -class TestGetAddressFamily(unittest.TestCase): - def test(self): - self.assertEqual(utils.GetAddressFamily("127.0.0.1"), socket.AF_INET) - self.assertEqual(utils.GetAddressFamily("10.2.0.127"), socket.AF_INET) - self.assertEqual(utils.GetAddressFamily("::1"), socket.AF_INET6) - self.assertEqual(utils.GetAddressFamily("fe80::a00:27ff:fe08:5048"), - socket.AF_INET6) - self.assertRaises(errors.GenericError, utils.GetAddressFamily, "0") + self.assert_(output.endswith("\n")) + + lines = output.splitlines() + self.assertEqual(len(lines), 9) + self.assertEqual(lines[0], "#!/bin/bash") + self.assert_(re.match(r"^\s+date$", lines[5])) + self.assertEqual(lines[7], "echo 'Hello World'") + + def testEmpty(self): + buf = StringIO() + sw = utils.ShellWriter(buf) + sw = None + self.assertEqual(buf.getvalue(), "") if __name__ == '__main__':