import time
import tempfile
import os.path
+import os
import md5
+import signal
import socket
import shutil
import re
+import select
import ganeti
+import testutils
from ganeti import constants
from ganeti import utils
-from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
+from ganeti import errors
+from ganeti.utils import IsProcessAlive, RunCmd, \
RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
- SetEtcHostsEntry, RemoveEtcHostsEntry
-from ganeti.errors import LockError, UnitParseError
-
+ SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
+ TailFile, ForceDictType, IsNormAbsPath
-class GanetiTestCase(unittest.TestCase):
- def assertFileContent(self, file_name, content):
- """Checks the content of a file.
-
- """
- handle = open(file_name, 'r')
- try:
- self.assertEqual(handle.read(), content)
- finally:
- handle.close()
+from ganeti.errors import LockError, UnitParseError, GenericError, \
+ ProgrammerError
class TestIsProcessAlive(unittest.TestCase):
"""Testing case for IsProcessAlive"""
- def setUp(self):
- # create a zombie and a (hopefully) non-existing process id
- self.pid_zombie = os.fork()
- if self.pid_zombie == 0:
- os._exit(0)
- elif self.pid_zombie < 0:
- raise SystemError("can't fork")
- self.pid_non_existing = os.fork()
- if self.pid_non_existing == 0:
- os._exit(0)
- elif self.pid_non_existing > 0:
- os.waitpid(self.pid_non_existing, 0)
- else:
- raise SystemError("can't fork")
-
def testExists(self):
mypid = os.getpid()
self.assert_(IsProcessAlive(mypid),
"can't find myself running")
- def testZombie(self):
- self.assert_(not IsProcessAlive(self.pid_zombie),
- "zombie not detected as zombie")
-
-
def testNotExisting(self):
- self.assert_(not IsProcessAlive(self.pid_non_existing),
- "noexisting process detected")
-
-
-class TestLocking(unittest.TestCase):
- """Testing case for the Lock/Unlock functions"""
- def clean_lock(self, name):
- try:
- ganeti.utils.Unlock("unittest")
- except LockError:
- pass
-
-
- def testLock(self):
- self.clean_lock("unittest")
- self.assertEqual(None, Lock("unittest"))
+ pid_non_existing = os.fork()
+ if pid_non_existing == 0:
+ os._exit(0)
+ elif pid_non_existing < 0:
+ raise SystemError("can't fork")
+ os.waitpid(pid_non_existing, 0)
+ self.assert_(not IsProcessAlive(pid_non_existing),
+ "nonexisting process detected")
- def testUnlock(self):
- self.clean_lock("unittest")
- ganeti.utils.Lock("unittest")
- self.assertEqual(None, Unlock("unittest"))
+class TestPidFileFunctions(unittest.TestCase):
+ """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
+ def setUp(self):
+ self.dir = tempfile.mkdtemp()
+ self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
+ utils.DaemonPidFileName = self.f_dpn
+
+ def testPidFileFunctions(self):
+ pid_file = self.f_dpn('test')
+ utils.WritePidFile('test')
+ self.failUnless(os.path.exists(pid_file),
+ "PID file should have been created")
+ read_pid = utils.ReadPidFile(pid_file)
+ self.failUnlessEqual(read_pid, os.getpid())
+ self.failUnless(utils.IsProcessAlive(read_pid))
+ self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
+ utils.RemovePidFile('test')
+ self.failIf(os.path.exists(pid_file),
+ "PID file should not exist anymore")
+ self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
+ "ReadPidFile should return 0 for missing pid file")
+ fh = open(pid_file, "w")
+ fh.write("blah\n")
+ fh.close()
+ self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
+ "ReadPidFile should return 0 for invalid pid file")
+ utils.RemovePidFile('test')
+ self.failIf(os.path.exists(pid_file),
+ "PID file should not exist anymore")
+
+ def testKill(self):
+ pid_file = self.f_dpn('child')
+ r_fd, w_fd = os.pipe()
+ new_pid = os.fork()
+ if new_pid == 0: #child
+ utils.WritePidFile('child')
+ os.write(w_fd, 'a')
+ signal.pause()
+ os._exit(0)
+ return
+ # else we are in the parent
+ # wait until the child has written the pid file
+ os.read(r_fd, 1)
+ read_pid = utils.ReadPidFile(pid_file)
+ self.failUnlessEqual(read_pid, new_pid)
+ self.failUnless(utils.IsProcessAlive(new_pid))
+ utils.KillProcess(new_pid, waitpid=True)
+ self.failIf(utils.IsProcessAlive(new_pid))
+ utils.RemovePidFile('child')
+ self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
- def testDoubleLock(self):
- self.clean_lock("unittest")
- ganeti.utils.Lock("unittest")
- self.assertRaises(LockError, Lock, "unittest")
+ def tearDown(self):
+ for name in os.listdir(self.dir):
+ os.unlink(os.path.join(self.dir, name))
+ os.rmdir(self.dir)
-class TestRunCmd(unittest.TestCase):
+class TestRunCmd(testutils.GanetiTestCase):
"""Testing case for the RunCmd function"""
def setUp(self):
+ testutils.GanetiTestCase.setUp(self)
self.magic = time.ctime() + " ganeti test"
+ self.fname = self._CreateTempFile()
def testOk(self):
"""Test successful exit code"""
result = RunCmd("/bin/sh -c 'exit 0'")
self.assertEqual(result.exit_code, 0)
+ self.assertEqual(result.output, "")
def testFail(self):
"""Test fail exit code"""
result = RunCmd("/bin/sh -c 'exit 1'")
self.assertEqual(result.exit_code, 1)
-
+ self.assertEqual(result.output, "")
def testStdout(self):
"""Test standard output"""
cmd = 'echo -n "%s"' % self.magic
result = RunCmd("/bin/sh -c '%s'" % cmd)
self.assertEqual(result.stdout, self.magic)
-
+ result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
+ self.assertEqual(result.output, "")
+ self.assertFileContent(self.fname, self.magic)
def testStderr(self):
"""Test standard error"""
cmd = 'echo -n "%s"' % self.magic
result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
self.assertEqual(result.stderr, self.magic)
-
+ result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
+ self.assertEqual(result.output, "")
+ self.assertFileContent(self.fname, self.magic)
def testCombined(self):
"""Test combined output"""
cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
+ expected = "A" + self.magic + "B" + self.magic
result = RunCmd("/bin/sh -c '%s'" % cmd)
- self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
+ self.assertEqual(result.output, expected)
+ result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
+ self.assertEqual(result.output, "")
+ self.assertFileContent(self.fname, expected)
def testSignal(self):
- """Test standard error"""
- result = RunCmd("/bin/sh -c 'kill -15 $$'")
+ """Test signal"""
+ result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
self.assertEqual(result.signal, 15)
+ self.assertEqual(result.output, "")
def testListRun(self):
"""Test list runs"""
self.assertEqual(result.exit_code, 0)
self.assertEqual(result.stdout, self.magic)
+ def testFileEmptyOutput(self):
+ """Test file output"""
+ result = RunCmd(["true"], output=self.fname)
+ self.assertEqual(result.signal, None)
+ self.assertEqual(result.exit_code, 0)
+ self.assertFileContent(self.fname, "")
+
def testLang(self):
"""Test locale environment"""
old_env = os.environ.copy()
finally:
os.environ = old_env
+ def testDefaultCwd(self):
+ """Test default working directory"""
+ self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
+
+ def testCwd(self):
+ """Test default working directory"""
+ self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
+ self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
+ cwd = os.getcwd()
+ self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
+
class TestRemoveFile(unittest.TestCase):
"""Test case for the RemoveFile function"""
self.fail("File '%s' not removed" % symlink)
+class TestRename(unittest.TestCase):
+ """Test case for RenameFile"""
+
+ def setUp(self):
+ """Create a temporary directory"""
+ self.tmpdir = tempfile.mkdtemp()
+ self.tmpfile = os.path.join(self.tmpdir, "test1")
+
+ # Touch the file
+ open(self.tmpfile, "w").close()
+
+ def tearDown(self):
+ """Remove temporary directory"""
+ shutil.rmtree(self.tmpdir)
+
+ def testSimpleRename1(self):
+ """Simple rename 1"""
+ utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
+
+ def testSimpleRename2(self):
+ """Simple rename 2"""
+ utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
+ mkdir=True)
+
+ def testRenameMkdir(self):
+ """Rename with mkdir"""
+ utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
+ mkdir=True)
+
+
class TestCheckdict(unittest.TestCase):
"""Test case for the CheckDict function"""
"""Test case for the FormatUnit function"""
def testMiB(self):
- self.assertEqual(FormatUnit(1), '1M')
- self.assertEqual(FormatUnit(100), '100M')
- self.assertEqual(FormatUnit(1023), '1023M')
+ self.assertEqual(FormatUnit(1, 'h'), '1M')
+ self.assertEqual(FormatUnit(100, 'h'), '100M')
+ self.assertEqual(FormatUnit(1023, 'h'), '1023M')
+
+ self.assertEqual(FormatUnit(1, 'm'), '1')
+ self.assertEqual(FormatUnit(100, 'm'), '100')
+ self.assertEqual(FormatUnit(1023, 'm'), '1023')
+
+ self.assertEqual(FormatUnit(1024, 'm'), '1024')
+ self.assertEqual(FormatUnit(1536, 'm'), '1536')
+ self.assertEqual(FormatUnit(17133, 'm'), '17133')
+ self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
def testGiB(self):
- self.assertEqual(FormatUnit(1024), '1.0G')
- self.assertEqual(FormatUnit(1536), '1.5G')
- self.assertEqual(FormatUnit(17133), '16.7G')
- self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
+ self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
+ self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
+ self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
+ self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
+
+ self.assertEqual(FormatUnit(1024, 'g'), '1.0')
+ self.assertEqual(FormatUnit(1536, 'g'), '1.5')
+ self.assertEqual(FormatUnit(17133, 'g'), '16.7')
+ self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
+
+ self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
+ self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
+ self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
def testTiB(self):
- self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
- self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
- self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
+ self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
+ self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
+ self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
+ self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
+ self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
+ self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
class TestParseUnit(unittest.TestCase):
"""Test case for the ParseUnit function"""
self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
-class TestSshKeys(GanetiTestCase):
+class TestSshKeys(testutils.GanetiTestCase):
"""Test case for the AddAuthorizedKey function"""
KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
- def writeTestFile(self):
- (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
- f = os.fdopen(fd, 'w')
+ def setUp(self):
+ testutils.GanetiTestCase.setUp(self)
+ self.tmpname = self._CreateTempFile()
+ handle = open(self.tmpname, 'w')
try:
- f.write(TestSshKeys.KEY_A)
- f.write("\n")
- f.write(TestSshKeys.KEY_B)
- f.write("\n")
+ handle.write("%s\n" % TestSshKeys.KEY_A)
+ handle.write("%s\n" % TestSshKeys.KEY_B)
finally:
- f.close()
-
- return tmpname
+ handle.close()
def testAddingNewKey(self):
- tmpname = self.writeTestFile()
- try:
- AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
+ AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
- self.assertFileContent(tmpname,
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
- "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
+ "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
- def testAddingAlmostButNotCompletlyTheSameKey(self):
- tmpname = self.writeTestFile()
- try:
- AddAuthorizedKey(tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
-
- self.assertFileContent(tmpname,
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
- finally:
- os.unlink(tmpname)
+ def testAddingAlmostButNotCompletelyTheSameKey(self):
+ AddAuthorizedKey(self.tmpname,
+ 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
+
+ self.assertFileContent(self.tmpname,
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
def testAddingExistingKeyWithSomeMoreSpaces(self):
- tmpname = self.writeTestFile()
- try:
- AddAuthorizedKey(tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
+ AddAuthorizedKey(self.tmpname,
+ 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
- self.assertFileContent(tmpname,
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
def testRemovingExistingKeyWithSomeMoreSpaces(self):
- tmpname = self.writeTestFile()
- try:
- RemoveAuthorizedKey(tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
+ RemoveAuthorizedKey(self.tmpname,
+ 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
- self.assertFileContent(tmpname,
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
def testRemovingNonExistingKey(self):
- tmpname = self.writeTestFile()
- try:
- RemoveAuthorizedKey(tmpname,
- 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
+ RemoveAuthorizedKey(self.tmpname,
+ 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
- self.assertFileContent(tmpname,
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
-class TestEtcHosts(GanetiTestCase):
+class TestEtcHosts(testutils.GanetiTestCase):
"""Test functions modifying /etc/hosts"""
- def writeTestFile(self):
- (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
- f = os.fdopen(fd, 'w')
+ def setUp(self):
+ testutils.GanetiTestCase.setUp(self)
+ self.tmpname = self._CreateTempFile()
+ handle = open(self.tmpname, 'w')
try:
- f.write('# This is a test file for /etc/hosts\n')
- f.write('127.0.0.1\tlocalhost\n')
- f.write('192.168.1.1 router gw\n')
+ handle.write('# This is a test file for /etc/hosts\n')
+ handle.write('127.0.0.1\tlocalhost\n')
+ handle.write('192.168.1.1 router gw\n')
finally:
- f.close()
-
- return tmpname
+ handle.close()
def testSettingNewIp(self):
- tmpname = self.writeTestFile()
- try:
- SetEtcHostsEntry(tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
+ SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
- self.assertFileContent(tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1 router gw\n"
- "1.2.3.4\tmyhost.domain.tld myhost\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1 router gw\n"
+ "1.2.3.4\tmyhost.domain.tld myhost\n")
+ self.assertFileMode(self.tmpname, 0644)
def testSettingExistingIp(self):
- tmpname = self.writeTestFile()
- try:
- SetEtcHostsEntry(tmpname, '192.168.1.1', 'myhost.domain.tld', ['myhost'])
+ SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
+ ['myhost'])
- self.assertFileContent(tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1\tmyhost.domain.tld myhost\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1\tmyhost.domain.tld myhost\n")
+ self.assertFileMode(self.tmpname, 0644)
+
+ def testSettingDuplicateName(self):
+ SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
+
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1 router gw\n"
+ "1.2.3.4\tmyhost\n")
+ self.assertFileMode(self.tmpname, 0644)
def testRemovingExistingHost(self):
- tmpname = self.writeTestFile()
- try:
- RemoveEtcHostsEntry(tmpname, 'router')
+ RemoveEtcHostsEntry(self.tmpname, 'router')
- self.assertFileContent(tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1 gw\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1 gw\n")
+ self.assertFileMode(self.tmpname, 0644)
def testRemovingSingleExistingHost(self):
- tmpname = self.writeTestFile()
- try:
- RemoveEtcHostsEntry(tmpname, 'localhost')
+ RemoveEtcHostsEntry(self.tmpname, 'localhost')
- self.assertFileContent(tmpname,
- "# This is a test file for /etc/hosts\n"
- "192.168.1.1 router gw\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "192.168.1.1 router gw\n")
+ self.assertFileMode(self.tmpname, 0644)
def testRemovingNonExistingHost(self):
- tmpname = self.writeTestFile()
- try:
- RemoveEtcHostsEntry(tmpname, 'myhost')
+ RemoveEtcHostsEntry(self.tmpname, 'myhost')
- self.assertFileContent(tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1 router gw\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1 router gw\n")
+ self.assertFileMode(self.tmpname, 0644)
def testRemovingAlias(self):
- tmpname = self.writeTestFile()
- try:
- RemoveEtcHostsEntry(tmpname, 'gw')
+ RemoveEtcHostsEntry(self.tmpname, 'gw')
- self.assertFileContent(tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1 router\n")
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1 router\n")
+ self.assertFileMode(self.tmpname, 0644)
class TestShellQuoting(unittest.TestCase):
def testTcpPingToLocalHostAccept(self):
self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
- constants.LOCALHOST_IP_ADDRESS,
self.listenerport,
timeout=10,
- live_port_needed=True),
+ live_port_needed=True,
+ source=constants.LOCALHOST_IP_ADDRESS,
+ ),
"failed to connect to test listener")
+ self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+ self.listenerport,
+ timeout=10,
+ live_port_needed=True,
+ ),
+ "failed to connect to test listener (no source)")
+
class TestTcpPingDeaf(unittest.TestCase):
"""Testcase for TCP version of ping - against non listen(2)ing port"""
def testTcpPingToLocalHostAcceptDeaf(self):
self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
- constants.LOCALHOST_IP_ADDRESS,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=True), # need successful connect(2)
+ live_port_needed=True,
+ source=constants.LOCALHOST_IP_ADDRESS,
+ ), # need successful connect(2)
"successfully connected to deaf listener")
+ self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+ self.deaflistenerport,
+ timeout=constants.TCP_PING_TIMEOUT,
+ live_port_needed=True,
+ ), # need successful connect(2)
+ "successfully connected to deaf listener (no source addr)")
+
def testTcpPingToLocalHostNoAccept(self):
self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
- constants.LOCALHOST_IP_ADDRESS,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=False), # ECONNREFUSED is OK
+ live_port_needed=False,
+ source=constants.LOCALHOST_IP_ADDRESS,
+ ), # ECONNREFUSED is OK
"failed to ping alive host on deaf port")
+ self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+ self.deaflistenerport,
+ timeout=constants.TCP_PING_TIMEOUT,
+ live_port_needed=False,
+ ), # ECONNREFUSED is OK
+ "failed to ping alive host on deaf port (no source addr)")
+
+
+class TestOwnIpAddress(unittest.TestCase):
+ """Testcase for OwnIpAddress"""
+
+ def testOwnLoopback(self):
+ """check having the loopback ip"""
+ self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
+ "Should own the loopback address")
+
+ def testNowOwnAddress(self):
+ """check that I don't own an address"""
+
+ # network 192.0.2.0/24 is reserved for test/documentation as per
+ # rfc 3330, so we *should* not have an address of this range... if
+ # this fails, we should extend the test to multiple addresses
+ DST_IP = "192.0.2.1"
+ self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
+
class TestListVisibleFiles(unittest.TestCase):
"""Test case for ListVisibleFiles"""
self.failUnless(self._re_uuid.match(utils.NewUUID()))
+class TestUniqueSequence(unittest.TestCase):
+ """Test case for UniqueSequence"""
+
+ def _test(self, input, expected):
+ self.assertEqual(utils.UniqueSequence(input), expected)
+
+ def runTest(self):
+ # Ordered input
+ self._test([1, 2, 3], [1, 2, 3])
+ self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
+ self._test([1, 2, 2, 3], [1, 2, 3])
+ self._test([1, 2, 3, 3], [1, 2, 3])
+
+ # Unordered input
+ self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
+ self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
+
+ # Strings
+ self._test(["a", "a"], ["a"])
+ self._test(["a", "b"], ["a", "b"])
+ self._test(["a", "b", "a"], ["a", "b"])
+
+
+class TestFirstFree(unittest.TestCase):
+ """Test case for the FirstFree function"""
+
+ def test(self):
+ """Test FirstFree"""
+ self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
+ self.failUnlessEqual(FirstFree([]), None)
+ self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
+ self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
+ self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
+
+
+class TestTailFile(testutils.GanetiTestCase):
+ """Test case for the TailFile function"""
+
+ def testEmpty(self):
+ fname = self._CreateTempFile()
+ self.failUnlessEqual(TailFile(fname), [])
+ self.failUnlessEqual(TailFile(fname, lines=25), [])
+
+ def testAllLines(self):
+ data = ["test %d" % i for i in range(30)]
+ for i in range(30):
+ fname = self._CreateTempFile()
+ fd = open(fname, "w")
+ fd.write("\n".join(data[:i]))
+ if i > 0:
+ fd.write("\n")
+ fd.close()
+ self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
+
+ def testPartialLines(self):
+ data = ["test %d" % i for i in range(30)]
+ fname = self._CreateTempFile()
+ fd = open(fname, "w")
+ fd.write("\n".join(data))
+ fd.write("\n")
+ fd.close()
+ for i in range(1, 30):
+ self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
+
+ def testBigFile(self):
+ data = ["test %d" % i for i in range(30)]
+ fname = self._CreateTempFile()
+ fd = open(fname, "w")
+ fd.write("X" * 1048576)
+ fd.write("\n")
+ fd.write("\n".join(data))
+ fd.write("\n")
+ fd.close()
+ for i in range(1, 30):
+ self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
+
+
+class TestFileLock(unittest.TestCase):
+ """Test case for the FileLock class"""
+
+ def setUp(self):
+ self.tmpfile = tempfile.NamedTemporaryFile()
+ self.lock = utils.FileLock(self.tmpfile.name)
+
+ def testSharedNonblocking(self):
+ self.lock.Shared(blocking=False)
+ self.lock.Close()
+
+ def testExclusiveNonblocking(self):
+ self.lock.Exclusive(blocking=False)
+ self.lock.Close()
+
+ def testUnlockNonblocking(self):
+ self.lock.Unlock(blocking=False)
+ self.lock.Close()
+
+ def testSharedBlocking(self):
+ self.lock.Shared(blocking=True)
+ self.lock.Close()
+
+ def testExclusiveBlocking(self):
+ self.lock.Exclusive(blocking=True)
+ self.lock.Close()
+
+ def testUnlockBlocking(self):
+ self.lock.Unlock(blocking=True)
+ self.lock.Close()
+
+ def testSharedExclusiveUnlock(self):
+ self.lock.Shared(blocking=False)
+ self.lock.Exclusive(blocking=False)
+ self.lock.Unlock(blocking=False)
+ self.lock.Close()
+
+ def testExclusiveSharedUnlock(self):
+ self.lock.Exclusive(blocking=False)
+ self.lock.Shared(blocking=False)
+ self.lock.Unlock(blocking=False)
+ self.lock.Close()
+
+ def testCloseShared(self):
+ self.lock.Close()
+ self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
+
+ def testCloseExclusive(self):
+ self.lock.Close()
+ self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
+
+ def testCloseUnlock(self):
+ self.lock.Close()
+ self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
+
+
+class TestTimeFunctions(unittest.TestCase):
+ """Test case for time functions"""
+
+ def runTest(self):
+ self.assertEqual(utils.SplitTime(1), (1, 0))
+ self.assertEqual(utils.SplitTime(1.5), (1, 500000))
+ self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
+ self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
+ self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
+ self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
+ self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
+ self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
+
+ self.assertRaises(AssertionError, utils.SplitTime, -1)
+
+ self.assertEqual(utils.MergeTime((1, 0)), 1.0)
+ self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
+ self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
+
+ self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
+ self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
+
+ self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
+ self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
+ self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
+ self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
+ self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
+
+
+class FieldSetTestCase(unittest.TestCase):
+ """Test case for FieldSets"""
+
+ def testSimpleMatch(self):
+ f = utils.FieldSet("a", "b", "c", "def")
+ self.failUnless(f.Matches("a"))
+ self.failIf(f.Matches("d"), "Substring matched")
+ self.failIf(f.Matches("defghi"), "Prefix string matched")
+ self.failIf(f.NonMatching(["b", "c"]))
+ self.failIf(f.NonMatching(["a", "b", "c", "def"]))
+ self.failUnless(f.NonMatching(["a", "d"]))
+
+ def testRegexMatch(self):
+ f = utils.FieldSet("a", "b([0-9]+)", "c")
+ self.failUnless(f.Matches("b1"))
+ self.failUnless(f.Matches("b99"))
+ self.failIf(f.Matches("b/1"))
+ self.failIf(f.NonMatching(["b12", "c"]))
+ self.failUnless(f.NonMatching(["a", "1"]))
+
+class TestForceDictType(unittest.TestCase):
+ """Test case for ForceDictType"""
+
+ def setUp(self):
+ self.key_types = {
+ 'a': constants.VTYPE_INT,
+ 'b': constants.VTYPE_BOOL,
+ 'c': constants.VTYPE_STRING,
+ 'd': constants.VTYPE_SIZE,
+ }
+
+ def _fdt(self, dict, allowed_values=None):
+ if allowed_values is None:
+ ForceDictType(dict, self.key_types)
+ else:
+ ForceDictType(dict, self.key_types, allowed_values=allowed_values)
+
+ return dict
+
+ def testSimpleDict(self):
+ self.assertEqual(self._fdt({}), {})
+ self.assertEqual(self._fdt({'a': 1}), {'a': 1})
+ self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
+ self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
+ self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
+ self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
+ self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
+ self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
+ self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
+ self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
+ self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
+ self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
+
+ def testErrors(self):
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
+
+
+class TestIsAbsNormPath(unittest.TestCase):
+ """Testing case for IsProcessAlive"""
+
+ def _pathTestHelper(self, path, result):
+ if result:
+ self.assert_(IsNormAbsPath(path),
+ "Path %s should result absolute and normalized" % path)
+ else:
+ self.assert_(not IsNormAbsPath(path),
+ "Path %s should not result absolute and normalized" % path)
+
+ def testBase(self):
+ self._pathTestHelper('/etc', True)
+ self._pathTestHelper('/srv', True)
+ self._pathTestHelper('etc', False)
+ self._pathTestHelper('/etc/../root', False)
+ self._pathTestHelper('/etc/', False)
+
if __name__ == '__main__':
unittest.main()