X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/9440aeab6a7526f73a74f39dd55f7dc4633bdd21..ba55d062da8dfb89a37afc2f13f2e689d0094829:/test/ganeti.utils_unittest.py?ds=inline diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 99b57dd..846b4b4 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -26,122 +26,160 @@ import os import time import tempfile import os.path +import os import md5 +import signal import socket import shutil import re +import select import ganeti +import testutils from ganeti import constants from ganeti import utils -from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \ +from ganeti import errors +from ganeti.utils import IsProcessAlive, RunCmd, \ RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \ ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \ ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \ - SetEtcHostsEntry, RemoveEtcHostsEntry -from ganeti.errors import LockError, UnitParseError + SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \ + TailFile, ForceDictType, IsNormAbsPath + +from ganeti.errors import LockError, UnitParseError, GenericError, \ + ProgrammerError class TestIsProcessAlive(unittest.TestCase): """Testing case for IsProcessAlive""" - def setUp(self): - # create a zombie and a (hopefully) non-existing process id - self.pid_zombie = os.fork() - if self.pid_zombie == 0: - os._exit(0) - elif self.pid_zombie < 0: - raise SystemError("can't fork") - self.pid_non_existing = os.fork() - if self.pid_non_existing == 0: - os._exit(0) - elif self.pid_non_existing > 0: - os.waitpid(self.pid_non_existing, 0) - else: - raise SystemError("can't fork") - def testExists(self): mypid = os.getpid() self.assert_(IsProcessAlive(mypid), "can't find myself running") - def testZombie(self): - self.assert_(not IsProcessAlive(self.pid_zombie), - "zombie not detected as zombie") - - def testNotExisting(self): - self.assert_(not IsProcessAlive(self.pid_non_existing), - "noexisting process detected") - - -class TestLocking(unittest.TestCase): - """Testing case for the Lock/Unlock functions""" - def clean_lock(self, name): - try: - ganeti.utils.Unlock("unittest") - except LockError: - pass - - - def testLock(self): - self.clean_lock("unittest") - self.assertEqual(None, Lock("unittest")) + pid_non_existing = os.fork() + if pid_non_existing == 0: + os._exit(0) + elif pid_non_existing < 0: + raise SystemError("can't fork") + os.waitpid(pid_non_existing, 0) + self.assert_(not IsProcessAlive(pid_non_existing), + "nonexisting process detected") - def testUnlock(self): - self.clean_lock("unittest") - ganeti.utils.Lock("unittest") - self.assertEqual(None, Unlock("unittest")) +class TestPidFileFunctions(unittest.TestCase): + """Tests for WritePidFile, RemovePidFile and ReadPidFile""" + def setUp(self): + self.dir = tempfile.mkdtemp() + self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name) + utils.DaemonPidFileName = self.f_dpn + + def testPidFileFunctions(self): + pid_file = self.f_dpn('test') + utils.WritePidFile('test') + self.failUnless(os.path.exists(pid_file), + "PID file should have been created") + read_pid = utils.ReadPidFile(pid_file) + self.failUnlessEqual(read_pid, os.getpid()) + self.failUnless(utils.IsProcessAlive(read_pid)) + self.failUnlessRaises(GenericError, utils.WritePidFile, 'test') + utils.RemovePidFile('test') + self.failIf(os.path.exists(pid_file), + "PID file should not exist anymore") + self.failUnlessEqual(utils.ReadPidFile(pid_file), 0, + "ReadPidFile should return 0 for missing pid file") + fh = open(pid_file, "w") + fh.write("blah\n") + fh.close() + self.failUnlessEqual(utils.ReadPidFile(pid_file), 0, + "ReadPidFile should return 0 for invalid pid file") + utils.RemovePidFile('test') + self.failIf(os.path.exists(pid_file), + "PID file should not exist anymore") + + def testKill(self): + pid_file = self.f_dpn('child') + r_fd, w_fd = os.pipe() + new_pid = os.fork() + if new_pid == 0: #child + utils.WritePidFile('child') + os.write(w_fd, 'a') + signal.pause() + os._exit(0) + return + # else we are in the parent + # wait until the child has written the pid file + os.read(r_fd, 1) + read_pid = utils.ReadPidFile(pid_file) + self.failUnlessEqual(read_pid, new_pid) + self.failUnless(utils.IsProcessAlive(new_pid)) + utils.KillProcess(new_pid, waitpid=True) + self.failIf(utils.IsProcessAlive(new_pid)) + utils.RemovePidFile('child') + self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0) - def testDoubleLock(self): - self.clean_lock("unittest") - ganeti.utils.Lock("unittest") - self.assertRaises(LockError, Lock, "unittest") + def tearDown(self): + for name in os.listdir(self.dir): + os.unlink(os.path.join(self.dir, name)) + os.rmdir(self.dir) -class TestRunCmd(unittest.TestCase): +class TestRunCmd(testutils.GanetiTestCase): """Testing case for the RunCmd function""" def setUp(self): + testutils.GanetiTestCase.setUp(self) self.magic = time.ctime() + " ganeti test" + self.fname = self._CreateTempFile() def testOk(self): """Test successful exit code""" result = RunCmd("/bin/sh -c 'exit 0'") self.assertEqual(result.exit_code, 0) + self.assertEqual(result.output, "") def testFail(self): """Test fail exit code""" result = RunCmd("/bin/sh -c 'exit 1'") self.assertEqual(result.exit_code, 1) - + self.assertEqual(result.output, "") def testStdout(self): """Test standard output""" cmd = 'echo -n "%s"' % self.magic result = RunCmd("/bin/sh -c '%s'" % cmd) self.assertEqual(result.stdout, self.magic) - + result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) + self.assertEqual(result.output, "") + self.assertFileContent(self.fname, self.magic) def testStderr(self): """Test standard error""" cmd = 'echo -n "%s"' % self.magic result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd) self.assertEqual(result.stderr, self.magic) - + result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname) + self.assertEqual(result.output, "") + self.assertFileContent(self.fname, self.magic) def testCombined(self): """Test combined output""" cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic) + expected = "A" + self.magic + "B" + self.magic result = RunCmd("/bin/sh -c '%s'" % cmd) - self.assertEqual(result.output, "A" + self.magic + "B" + self.magic) + self.assertEqual(result.output, expected) + result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) + self.assertEqual(result.output, "") + self.assertFileContent(self.fname, expected) def testSignal(self): - """Test standard error""" - result = RunCmd("/bin/sh -c 'kill -15 $$'") + """Test signal""" + result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"]) self.assertEqual(result.signal, 15) + self.assertEqual(result.output, "") def testListRun(self): """Test list runs""" @@ -156,6 +194,13 @@ class TestRunCmd(unittest.TestCase): self.assertEqual(result.exit_code, 0) self.assertEqual(result.stdout, self.magic) + def testFileEmptyOutput(self): + """Test file output""" + result = RunCmd(["true"], output=self.fname) + self.assertEqual(result.signal, None) + self.assertEqual(result.exit_code, 0) + self.assertFileContent(self.fname, "") + def testLang(self): """Test locale environment""" old_env = os.environ.copy() @@ -173,6 +218,17 @@ class TestRunCmd(unittest.TestCase): finally: os.environ = old_env + def testDefaultCwd(self): + """Test default working directory""" + self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/") + + def testCwd(self): + """Test default working directory""" + self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/") + self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp") + cwd = os.getcwd() + self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd) + class TestRemoveFile(unittest.TestCase): """Test case for the RemoveFile function""" @@ -220,6 +276,36 @@ class TestRemoveFile(unittest.TestCase): self.fail("File '%s' not removed" % symlink) +class TestRename(unittest.TestCase): + """Test case for RenameFile""" + + def setUp(self): + """Create a temporary directory""" + self.tmpdir = tempfile.mkdtemp() + self.tmpfile = os.path.join(self.tmpdir, "test1") + + # Touch the file + open(self.tmpfile, "w").close() + + def tearDown(self): + """Remove temporary directory""" + shutil.rmtree(self.tmpdir) + + def testSimpleRename1(self): + """Simple rename 1""" + utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz")) + + def testSimpleRename2(self): + """Simple rename 2""" + utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"), + mkdir=True) + + def testRenameMkdir(self): + """Rename with mkdir""" + utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"), + mkdir=True) + + class TestCheckdict(unittest.TestCase): """Test case for the CheckDict function""" @@ -267,21 +353,42 @@ class TestFormatUnit(unittest.TestCase): """Test case for the FormatUnit function""" def testMiB(self): - self.assertEqual(FormatUnit(1), '1M') - self.assertEqual(FormatUnit(100), '100M') - self.assertEqual(FormatUnit(1023), '1023M') + self.assertEqual(FormatUnit(1, 'h'), '1M') + self.assertEqual(FormatUnit(100, 'h'), '100M') + self.assertEqual(FormatUnit(1023, 'h'), '1023M') + + self.assertEqual(FormatUnit(1, 'm'), '1') + self.assertEqual(FormatUnit(100, 'm'), '100') + self.assertEqual(FormatUnit(1023, 'm'), '1023') + + self.assertEqual(FormatUnit(1024, 'm'), '1024') + self.assertEqual(FormatUnit(1536, 'm'), '1536') + self.assertEqual(FormatUnit(17133, 'm'), '17133') + self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575') def testGiB(self): - self.assertEqual(FormatUnit(1024), '1.0G') - self.assertEqual(FormatUnit(1536), '1.5G') - self.assertEqual(FormatUnit(17133), '16.7G') - self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G') + self.assertEqual(FormatUnit(1024, 'h'), '1.0G') + self.assertEqual(FormatUnit(1536, 'h'), '1.5G') + self.assertEqual(FormatUnit(17133, 'h'), '16.7G') + self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G') + + self.assertEqual(FormatUnit(1024, 'g'), '1.0') + self.assertEqual(FormatUnit(1536, 'g'), '1.5') + self.assertEqual(FormatUnit(17133, 'g'), '16.7') + self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0') + + self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0') + self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0') + self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0') def testTiB(self): - self.assertEqual(FormatUnit(1024 * 1024), '1.0T') - self.assertEqual(FormatUnit(5120 * 1024), '5.0T') - self.assertEqual(FormatUnit(29829 * 1024), '29.1T') + self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T') + self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T') + self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T') + self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0') + self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0') + self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1') class TestParseUnit(unittest.TestCase): """Test case for the ParseUnit function""" @@ -323,7 +430,8 @@ class TestParseUnit(unittest.TestCase): for sep in ('', ' ', ' ', "\t", "\t "): for suffix, scale in TestParseUnit.SCALES: for func in (lambda x: x, str.lower, str.upper): - self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale) + self.assertEqual(ParseUnit('1024' + sep + func(suffix)), + 1024 * scale) def testInvalidInput(self): for sep in ('-', '_', ',', 'a'): @@ -334,202 +442,147 @@ class TestParseUnit(unittest.TestCase): self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix) -class TestSshKeys(unittest.TestCase): +class TestSshKeys(testutils.GanetiTestCase): """Test case for the AddAuthorizedKey function""" KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a' KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" ' 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b') - # NOTE: The MD5 sums below were calculated after manually - # checking the output files. - - def writeTestFile(self): - (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test') - f = os.fdopen(fd, 'w') + def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.tmpname = self._CreateTempFile() + handle = open(self.tmpname, 'w') try: - f.write(TestSshKeys.KEY_A) - f.write("\n") - f.write(TestSshKeys.KEY_B) - f.write("\n") + handle.write("%s\n" % TestSshKeys.KEY_A) + handle.write("%s\n" % TestSshKeys.KEY_B) finally: - f.close() - - return tmpname + handle.close() def testAddingNewKey(self): - tmpname = self.writeTestFile() - try: - AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') + AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - 'ccc71523108ca6e9d0343797dc3e9f16') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" + "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n") - def testAddingAlmostButNotCompletlyTheSameKey(self): - tmpname = self.writeTestFile() - try: - AddAuthorizedKey(tmpname, - 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test') + def testAddingAlmostButNotCompletelyTheSameKey(self): + AddAuthorizedKey(self.tmpname, + 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test') - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - 'f2c939d57addb5b3a6846884be896b46') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n") def testAddingExistingKeyWithSomeMoreSpaces(self): - tmpname = self.writeTestFile() - try: - AddAuthorizedKey(tmpname, - 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') + AddAuthorizedKey(self.tmpname, + 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - '4e612764808bd46337eb0f575415fc30') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") def testRemovingExistingKeyWithSomeMoreSpaces(self): - tmpname = self.writeTestFile() - try: - RemoveAuthorizedKey(tmpname, - 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') + RemoveAuthorizedKey(self.tmpname, + 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - '77516d987fca07f70e30b830b3e4f2ed') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") def testRemovingNonExistingKey(self): - tmpname = self.writeTestFile() - try: - RemoveAuthorizedKey(tmpname, - 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test') + RemoveAuthorizedKey(self.tmpname, + 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test') - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - '4e612764808bd46337eb0f575415fc30') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" + 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"' + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") -class TestEtcHosts(unittest.TestCase): +class TestEtcHosts(testutils.GanetiTestCase): """Test functions modifying /etc/hosts""" - def writeTestFile(self): - (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test') - f = os.fdopen(fd, 'w') + def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.tmpname = self._CreateTempFile() + handle = open(self.tmpname, 'w') try: - f.write('# This is a test file for /etc/hosts\n') - f.write('127.0.0.1\tlocalhost\n') - f.write('192.168.1.1 router gw\n') + handle.write('# This is a test file for /etc/hosts\n') + handle.write('127.0.0.1\tlocalhost\n') + handle.write('192.168.1.1 router gw\n') finally: - f.close() - - return tmpname + handle.close() def testSettingNewIp(self): - tmpname = self.writeTestFile() - try: - SetEtcHostsEntry(tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost']) + SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost']) - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - '410c141dcafffd505f662a41713d2eab') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 router gw\n" + "1.2.3.4\tmyhost.domain.tld myhost\n") + self.assertFileMode(self.tmpname, 0644) def testSettingExistingIp(self): - tmpname = self.writeTestFile() - try: - SetEtcHostsEntry(tmpname, '192.168.1.1', 'myhost.domain.tld', ['myhost']) + SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld', + ['myhost']) - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - 'bbf60c542dec949f3968b59522ec0d7b') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1\tmyhost.domain.tld myhost\n") + self.assertFileMode(self.tmpname, 0644) + + def testSettingDuplicateName(self): + SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost']) + + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 router gw\n" + "1.2.3.4\tmyhost\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingExistingHost(self): - tmpname = self.writeTestFile() - try: - RemoveEtcHostsEntry(tmpname, 'router') + RemoveEtcHostsEntry(self.tmpname, 'router') - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - '8b09207a23709d60240674601a3548b2') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingSingleExistingHost(self): - tmpname = self.writeTestFile() - try: - RemoveEtcHostsEntry(tmpname, 'localhost') + RemoveEtcHostsEntry(self.tmpname, 'localhost') - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - 'ec4e4589b56f82fdb88db5675de011b1') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "192.168.1.1 router gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingNonExistingHost(self): - tmpname = self.writeTestFile() - try: - RemoveEtcHostsEntry(tmpname, 'myhost') + RemoveEtcHostsEntry(self.tmpname, 'myhost') - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - 'aa005bddc6d9ee399c296953f194929e') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 router gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingAlias(self): - tmpname = self.writeTestFile() - try: - RemoveEtcHostsEntry(tmpname, 'gw') + RemoveEtcHostsEntry(self.tmpname, 'gw') - f = open(tmpname, 'r') - try: - self.assertEqual(md5.new(f.read(8192)).hexdigest(), - '156dd3980a17b2ef934e2d0bf670aca2') - finally: - f.close() - finally: - os.unlink(tmpname) + self.assertFileContent(self.tmpname, + "# This is a test file for /etc/hosts\n" + "127.0.0.1\tlocalhost\n" + "192.168.1.1 router\n") + self.assertFileMode(self.tmpname, 0644) class TestShellQuoting(unittest.TestCase): @@ -564,12 +617,20 @@ class TestTcpPing(unittest.TestCase): def testTcpPingToLocalHostAccept(self): self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS, - constants.LOCALHOST_IP_ADDRESS, self.listenerport, timeout=10, - live_port_needed=True), + live_port_needed=True, + source=constants.LOCALHOST_IP_ADDRESS, + ), "failed to connect to test listener") + self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS, + self.listenerport, + timeout=10, + live_port_needed=True, + ), + "failed to connect to test listener (no source)") + class TestTcpPingDeaf(unittest.TestCase): """Testcase for TCP version of ping - against non listen(2)ing port""" @@ -585,20 +646,54 @@ class TestTcpPingDeaf(unittest.TestCase): def testTcpPingToLocalHostAcceptDeaf(self): self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS, - constants.LOCALHOST_IP_ADDRESS, self.deaflistenerport, timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=True), # need successful connect(2) + live_port_needed=True, + source=constants.LOCALHOST_IP_ADDRESS, + ), # need successful connect(2) "successfully connected to deaf listener") + self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS, + self.deaflistenerport, + timeout=constants.TCP_PING_TIMEOUT, + live_port_needed=True, + ), # need successful connect(2) + "successfully connected to deaf listener (no source addr)") + def testTcpPingToLocalHostNoAccept(self): self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS, - constants.LOCALHOST_IP_ADDRESS, self.deaflistenerport, timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=False), # ECONNREFUSED is OK + live_port_needed=False, + source=constants.LOCALHOST_IP_ADDRESS, + ), # ECONNREFUSED is OK "failed to ping alive host on deaf port") + self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS, + self.deaflistenerport, + timeout=constants.TCP_PING_TIMEOUT, + live_port_needed=False, + ), # ECONNREFUSED is OK + "failed to ping alive host on deaf port (no source addr)") + + +class TestOwnIpAddress(unittest.TestCase): + """Testcase for OwnIpAddress""" + + def testOwnLoopback(self): + """check having the loopback ip""" + self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS), + "Should own the loopback address") + + def testNowOwnAddress(self): + """check that I don't own an address""" + + # network 192.0.2.0/24 is reserved for test/documentation as per + # rfc 3330, so we *should* not have an address of this range... if + # this fails, we should extend the test to multiple addresses + DST_IP = "192.0.2.1" + self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP) + class TestListVisibleFiles(unittest.TestCase): """Test case for ListVisibleFiles""" @@ -652,5 +747,245 @@ class TestNewUUID(unittest.TestCase): self.failUnless(self._re_uuid.match(utils.NewUUID())) +class TestUniqueSequence(unittest.TestCase): + """Test case for UniqueSequence""" + + def _test(self, input, expected): + self.assertEqual(utils.UniqueSequence(input), expected) + + def runTest(self): + # Ordered input + self._test([1, 2, 3], [1, 2, 3]) + self._test([1, 1, 2, 2, 3, 3], [1, 2, 3]) + self._test([1, 2, 2, 3], [1, 2, 3]) + self._test([1, 2, 3, 3], [1, 2, 3]) + + # Unordered input + self._test([1, 2, 3, 1, 2, 3], [1, 2, 3]) + self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3]) + + # Strings + self._test(["a", "a"], ["a"]) + self._test(["a", "b"], ["a", "b"]) + self._test(["a", "b", "a"], ["a", "b"]) + + +class TestFirstFree(unittest.TestCase): + """Test case for the FirstFree function""" + + def test(self): + """Test FirstFree""" + self.failUnlessEqual(FirstFree([0, 1, 3]), 2) + self.failUnlessEqual(FirstFree([]), None) + self.failUnlessEqual(FirstFree([3, 4, 6]), 0) + self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5) + self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3) + + +class TestTailFile(testutils.GanetiTestCase): + """Test case for the TailFile function""" + + def testEmpty(self): + fname = self._CreateTempFile() + self.failUnlessEqual(TailFile(fname), []) + self.failUnlessEqual(TailFile(fname, lines=25), []) + + def testAllLines(self): + data = ["test %d" % i for i in range(30)] + for i in range(30): + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("\n".join(data[:i])) + if i > 0: + fd.write("\n") + fd.close() + self.failUnlessEqual(TailFile(fname, lines=i), data[:i]) + + def testPartialLines(self): + data = ["test %d" % i for i in range(30)] + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("\n".join(data)) + fd.write("\n") + fd.close() + for i in range(1, 30): + self.failUnlessEqual(TailFile(fname, lines=i), data[-i:]) + + def testBigFile(self): + data = ["test %d" % i for i in range(30)] + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("X" * 1048576) + fd.write("\n") + fd.write("\n".join(data)) + fd.write("\n") + fd.close() + for i in range(1, 30): + self.failUnlessEqual(TailFile(fname, lines=i), data[-i:]) + + +class TestFileLock(unittest.TestCase): + """Test case for the FileLock class""" + + def setUp(self): + self.tmpfile = tempfile.NamedTemporaryFile() + self.lock = utils.FileLock(self.tmpfile.name) + + def testSharedNonblocking(self): + self.lock.Shared(blocking=False) + self.lock.Close() + + def testExclusiveNonblocking(self): + self.lock.Exclusive(blocking=False) + self.lock.Close() + + def testUnlockNonblocking(self): + self.lock.Unlock(blocking=False) + self.lock.Close() + + def testSharedBlocking(self): + self.lock.Shared(blocking=True) + self.lock.Close() + + def testExclusiveBlocking(self): + self.lock.Exclusive(blocking=True) + self.lock.Close() + + def testUnlockBlocking(self): + self.lock.Unlock(blocking=True) + self.lock.Close() + + def testSharedExclusiveUnlock(self): + self.lock.Shared(blocking=False) + self.lock.Exclusive(blocking=False) + self.lock.Unlock(blocking=False) + self.lock.Close() + + def testExclusiveSharedUnlock(self): + self.lock.Exclusive(blocking=False) + self.lock.Shared(blocking=False) + self.lock.Unlock(blocking=False) + self.lock.Close() + + def testCloseShared(self): + self.lock.Close() + self.assertRaises(AssertionError, self.lock.Shared, blocking=False) + + def testCloseExclusive(self): + self.lock.Close() + self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False) + + def testCloseUnlock(self): + self.lock.Close() + self.assertRaises(AssertionError, self.lock.Unlock, blocking=False) + + +class TestTimeFunctions(unittest.TestCase): + """Test case for time functions""" + + def runTest(self): + self.assertEqual(utils.SplitTime(1), (1, 0)) + self.assertEqual(utils.SplitTime(1.5), (1, 500000)) + self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915)) + self.assertEqual(utils.SplitTime(123.48012), (123, 480120)) + self.assertEqual(utils.SplitTime(123.9996), (123, 999600)) + self.assertEqual(utils.SplitTime(123.9995), (123, 999500)) + self.assertEqual(utils.SplitTime(123.9994), (123, 999400)) + self.assertEqual(utils.SplitTime(123.999999999), (123, 999999)) + + self.assertRaises(AssertionError, utils.SplitTime, -1) + + self.assertEqual(utils.MergeTime((1, 0)), 1.0) + self.assertEqual(utils.MergeTime((1, 500000)), 1.5) + self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5) + + self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481) + self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801) + + self.assertRaises(AssertionError, utils.MergeTime, (0, -1)) + self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000)) + self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999)) + self.assertRaises(AssertionError, utils.MergeTime, (-1, 0)) + self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0)) + + +class FieldSetTestCase(unittest.TestCase): + """Test case for FieldSets""" + + def testSimpleMatch(self): + f = utils.FieldSet("a", "b", "c", "def") + self.failUnless(f.Matches("a")) + self.failIf(f.Matches("d"), "Substring matched") + self.failIf(f.Matches("defghi"), "Prefix string matched") + self.failIf(f.NonMatching(["b", "c"])) + self.failIf(f.NonMatching(["a", "b", "c", "def"])) + self.failUnless(f.NonMatching(["a", "d"])) + + def testRegexMatch(self): + f = utils.FieldSet("a", "b([0-9]+)", "c") + self.failUnless(f.Matches("b1")) + self.failUnless(f.Matches("b99")) + self.failIf(f.Matches("b/1")) + self.failIf(f.NonMatching(["b12", "c"])) + self.failUnless(f.NonMatching(["a", "1"])) + +class TestForceDictType(unittest.TestCase): + """Test case for ForceDictType""" + + def setUp(self): + self.key_types = { + 'a': constants.VTYPE_INT, + 'b': constants.VTYPE_BOOL, + 'c': constants.VTYPE_STRING, + 'd': constants.VTYPE_SIZE, + } + + def _fdt(self, dict, allowed_values=None): + if allowed_values is None: + ForceDictType(dict, self.key_types) + else: + ForceDictType(dict, self.key_types, allowed_values=allowed_values) + + return dict + + def testSimpleDict(self): + self.assertEqual(self._fdt({}), {}) + self.assertEqual(self._fdt({'a': 1}), {'a': 1}) + self.assertEqual(self._fdt({'a': '1'}), {'a': 1}) + self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True}) + self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'}) + self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''}) + self.assertEqual(self._fdt({'b': 'false'}), {'b': False}) + self.assertEqual(self._fdt({'b': 'False'}), {'b': False}) + self.assertEqual(self._fdt({'b': 'true'}), {'b': True}) + self.assertEqual(self._fdt({'b': 'True'}), {'b': True}) + self.assertEqual(self._fdt({'d': '4'}), {'d': 4}) + self.assertEqual(self._fdt({'d': '4M'}), {'d': 4}) + + def testErrors(self): + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'}) + + +class TestIsAbsNormPath(unittest.TestCase): + """Testing case for IsProcessAlive""" + + def _pathTestHelper(self, path, result): + if result: + self.assert_(IsNormAbsPath(path), + "Path %s should result absolute and normalized" % path) + else: + self.assert_(not IsNormAbsPath(path), + "Path %s should not result absolute and normalized" % path) + + def testBase(self): + self._pathTestHelper('/etc', True) + self._pathTestHelper('/srv', True) + self._pathTestHelper('etc', False) + self._pathTestHelper('/etc/../root', False) + self._pathTestHelper('/etc/', False) + if __name__ == '__main__': unittest.main()