import testutils
from ganeti import constants
from ganeti import utils
+from ganeti import errors
from ganeti.utils import IsProcessAlive, RunCmd, \
RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
- SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
+ SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
+ TailFile, ForceDictType, IsNormAbsPath
+
from ganeti.errors import LockError, UnitParseError, GenericError, \
ProgrammerError
"""Testing case for the RunCmd function"""
def setUp(self):
+ testutils.GanetiTestCase.setUp(self)
self.magic = time.ctime() + " ganeti test"
- fh, self.fname = tempfile.mkstemp()
- os.close(fh)
-
- def tearDown(self):
- if self.fname:
- utils.RemoveFile(self.fname)
+ self.fname = self._CreateTempFile()
def testOk(self):
"""Test successful exit code"""
'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
def setUp(self):
- (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
+ testutils.GanetiTestCase.setUp(self)
+ self.tmpname = self._CreateTempFile()
+ handle = open(self.tmpname, 'w')
try:
- handle = os.fdopen(fd, 'w')
- try:
- handle.write("%s\n" % TestSshKeys.KEY_A)
- handle.write("%s\n" % TestSshKeys.KEY_B)
- finally:
- handle.close()
- except:
- utils.RemoveFile(self.tmpname)
- raise
-
- def tearDown(self):
- utils.RemoveFile(self.tmpname)
- del self.tmpname
+ handle.write("%s\n" % TestSshKeys.KEY_A)
+ handle.write("%s\n" % TestSshKeys.KEY_B)
+ finally:
+ handle.close()
def testAddingNewKey(self):
AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
"""Test functions modifying /etc/hosts"""
def setUp(self):
- (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
+ testutils.GanetiTestCase.setUp(self)
+ self.tmpname = self._CreateTempFile()
+ handle = open(self.tmpname, 'w')
try:
- handle = os.fdopen(fd, 'w')
- try:
- handle.write('# This is a test file for /etc/hosts\n')
- handle.write('127.0.0.1\tlocalhost\n')
- handle.write('192.168.1.1 router gw\n')
- finally:
- handle.close()
- except:
- utils.RemoveFile(self.tmpname)
- raise
-
- def tearDown(self):
- utils.RemoveFile(self.tmpname)
- del self.tmpname
+ handle.write('# This is a test file for /etc/hosts\n')
+ handle.write('127.0.0.1\tlocalhost\n')
+ handle.write('192.168.1.1 router gw\n')
+ finally:
+ handle.close()
def testSettingNewIp(self):
SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
"127.0.0.1\tlocalhost\n"
"192.168.1.1 router gw\n"
"1.2.3.4\tmyhost.domain.tld myhost\n")
+ self.assertFileMode(self.tmpname, 0644)
def testSettingExistingIp(self):
SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
"# This is a test file for /etc/hosts\n"
"127.0.0.1\tlocalhost\n"
"192.168.1.1\tmyhost.domain.tld myhost\n")
+ self.assertFileMode(self.tmpname, 0644)
def testSettingDuplicateName(self):
SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
"127.0.0.1\tlocalhost\n"
"192.168.1.1 router gw\n"
"1.2.3.4\tmyhost\n")
+ self.assertFileMode(self.tmpname, 0644)
def testRemovingExistingHost(self):
RemoveEtcHostsEntry(self.tmpname, 'router')
"# This is a test file for /etc/hosts\n"
"127.0.0.1\tlocalhost\n"
"192.168.1.1 gw\n")
+ self.assertFileMode(self.tmpname, 0644)
def testRemovingSingleExistingHost(self):
RemoveEtcHostsEntry(self.tmpname, 'localhost')
self.assertFileContent(self.tmpname,
"# This is a test file for /etc/hosts\n"
"192.168.1.1 router gw\n")
+ self.assertFileMode(self.tmpname, 0644)
def testRemovingNonExistingHost(self):
RemoveEtcHostsEntry(self.tmpname, 'myhost')
"# This is a test file for /etc/hosts\n"
"127.0.0.1\tlocalhost\n"
"192.168.1.1 router gw\n")
+ self.assertFileMode(self.tmpname, 0644)
def testRemovingAlias(self):
RemoveEtcHostsEntry(self.tmpname, 'gw')
"# This is a test file for /etc/hosts\n"
"127.0.0.1\tlocalhost\n"
"192.168.1.1 router\n")
+ self.assertFileMode(self.tmpname, 0644)
class TestShellQuoting(unittest.TestCase):
self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
+class TestTailFile(testutils.GanetiTestCase):
+ """Test case for the TailFile function"""
+
+ def testEmpty(self):
+ fname = self._CreateTempFile()
+ self.failUnlessEqual(TailFile(fname), [])
+ self.failUnlessEqual(TailFile(fname, lines=25), [])
+
+ def testAllLines(self):
+ data = ["test %d" % i for i in range(30)]
+ for i in range(30):
+ fname = self._CreateTempFile()
+ fd = open(fname, "w")
+ fd.write("\n".join(data[:i]))
+ if i > 0:
+ fd.write("\n")
+ fd.close()
+ self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
+
+ def testPartialLines(self):
+ data = ["test %d" % i for i in range(30)]
+ fname = self._CreateTempFile()
+ fd = open(fname, "w")
+ fd.write("\n".join(data))
+ fd.write("\n")
+ fd.close()
+ for i in range(1, 30):
+ self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
+
+ def testBigFile(self):
+ data = ["test %d" % i for i in range(30)]
+ fname = self._CreateTempFile()
+ fd = open(fname, "w")
+ fd.write("X" * 1048576)
+ fd.write("\n")
+ fd.write("\n".join(data))
+ fd.write("\n")
+ fd.close()
+ for i in range(1, 30):
+ self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
+
+
class TestFileLock(unittest.TestCase):
"""Test case for the FileLock class"""
self.failIf(f.NonMatching(["b12", "c"]))
self.failUnless(f.NonMatching(["a", "1"]))
+class TestForceDictType(unittest.TestCase):
+ """Test case for ForceDictType"""
+
+ def setUp(self):
+ self.key_types = {
+ 'a': constants.VTYPE_INT,
+ 'b': constants.VTYPE_BOOL,
+ 'c': constants.VTYPE_STRING,
+ 'd': constants.VTYPE_SIZE,
+ }
+
+ def _fdt(self, dict, allowed_values=None):
+ if allowed_values is None:
+ ForceDictType(dict, self.key_types)
+ else:
+ ForceDictType(dict, self.key_types, allowed_values=allowed_values)
+
+ return dict
+
+ def testSimpleDict(self):
+ self.assertEqual(self._fdt({}), {})
+ self.assertEqual(self._fdt({'a': 1}), {'a': 1})
+ self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
+ self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
+ self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
+ self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
+ self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
+ self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
+ self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
+ self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
+ self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
+ self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
+
+ def testErrors(self):
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
+
+
+class TestIsAbsNormPath(unittest.TestCase):
+ """Testing case for IsProcessAlive"""
+
+ def _pathTestHelper(self, path, result):
+ if result:
+ self.assert_(IsNormAbsPath(path),
+ "Path %s should result absolute and normalized" % path)
+ else:
+ self.assert_(not IsNormAbsPath(path),
+ "Path %s should not result absolute and normalized" % path)
+
+ def testBase(self):
+ self._pathTestHelper('/etc', True)
+ self._pathTestHelper('/srv', True)
+ self._pathTestHelper('etc', False)
+ self._pathTestHelper('/etc/../root', False)
+ self._pathTestHelper('/etc/', False)
if __name__ == '__main__':
unittest.main()