RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
- AddEtcHostsEntry, RemoveEtcHostsEntry
+ SetEtcHostsEntry, RemoveEtcHostsEntry
from ganeti.errors import LockError, UnitParseError
+class GanetiTestCase(unittest.TestCase):
+ def assertFileContent(self, file_name, content):
+ """Checks the content of a file.
+
+ """
+ handle = open(file_name, 'r')
+ try:
+ self.assertEqual(handle.read(), content)
+ finally:
+ handle.close()
+
+
class TestIsProcessAlive(unittest.TestCase):
"""Testing case for IsProcessAlive"""
def setUp(self):
for sep in ('', ' ', ' ', "\t", "\t "):
for suffix, scale in TestParseUnit.SCALES:
for func in (lambda x: x, str.lower, str.upper):
- self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
+ self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
+ 1024 * scale)
def testInvalidInput(self):
for sep in ('-', '_', ',', 'a'):
self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
-class TestSshKeys(unittest.TestCase):
+class TestSshKeys(GanetiTestCase):
"""Test case for the AddAuthorizedKey function"""
KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
- # NOTE: The MD5 sums below were calculated after manually
- # checking the output files.
-
- def writeTestFile(self):
- (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
- f = os.fdopen(fd, 'w')
+ def setUp(self):
+ (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
try:
- f.write(TestSshKeys.KEY_A)
- f.write("\n")
- f.write(TestSshKeys.KEY_B)
- f.write("\n")
- finally:
- f.close()
+ handle = os.fdopen(fd, 'w')
+ try:
+ handle.write("%s\n" % TestSshKeys.KEY_A)
+ handle.write("%s\n" % TestSshKeys.KEY_B)
+ finally:
+ handle.close()
+ except:
+ utils.RemoveFile(self.tmpname)
+ raise
- return tmpname
+ def tearDown(self):
+ utils.RemoveFile(self.tmpname)
+ del self.tmpname
def testAddingNewKey(self):
- tmpname = self.writeTestFile()
- try:
- AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
+ AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- 'ccc71523108ca6e9d0343797dc3e9f16')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
+ "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
def testAddingAlmostButNotCompletlyTheSameKey(self):
- tmpname = self.writeTestFile()
- try:
- AddAuthorizedKey(tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
+ AddAuthorizedKey(self.tmpname,
+ 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- 'f2c939d57addb5b3a6846884be896b46')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
def testAddingExistingKeyWithSomeMoreSpaces(self):
- tmpname = self.writeTestFile()
- try:
- AddAuthorizedKey(tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
+ AddAuthorizedKey(self.tmpname,
+ 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- '4e612764808bd46337eb0f575415fc30')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
def testRemovingExistingKeyWithSomeMoreSpaces(self):
- tmpname = self.writeTestFile()
- try:
- RemoveAuthorizedKey(tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
+ RemoveAuthorizedKey(self.tmpname,
+ 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- '77516d987fca07f70e30b830b3e4f2ed')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
def testRemovingNonExistingKey(self):
- tmpname = self.writeTestFile()
- try:
- RemoveAuthorizedKey(tmpname,
- 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
+ RemoveAuthorizedKey(self.tmpname,
+ 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- '4e612764808bd46337eb0f575415fc30')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
+ 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
-class TestEtcHosts(unittest.TestCase):
+class TestEtcHosts(GanetiTestCase):
"""Test functions modifying /etc/hosts"""
- def writeTestFile(self):
- (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
- f = os.fdopen(fd, 'w')
+ def setUp(self):
+ (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
try:
- f.write('# This is a test file for /etc/hosts\n')
- f.write('127.0.0.1\tlocalhost\n')
- f.write('192.168.1.1 router gw\n')
- finally:
- f.close()
+ handle = os.fdopen(fd, 'w')
+ try:
+ handle.write('# This is a test file for /etc/hosts\n')
+ handle.write('127.0.0.1\tlocalhost\n')
+ handle.write('192.168.1.1 router gw\n')
+ finally:
+ handle.close()
+ except:
+ utils.RemoveFile(self.tmpname)
+ raise
- return tmpname
+ def tearDown(self):
+ utils.RemoveFile(self.tmpname)
+ del self.tmpname
- def testAddingNewIp(self):
- tmpname = self.writeTestFile()
- try:
- AddEtcHostsEntry(tmpname, 'myhost.domain.tld', '1.2.3.4')
+ def testSettingNewIp(self):
+ SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- '00e0e88250482e7449743c89a49e9349')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1 router gw\n"
+ "1.2.3.4\tmyhost.domain.tld myhost\n")
- def testAddingExistingIp(self):
- tmpname = self.writeTestFile()
- try:
- AddEtcHostsEntry(tmpname, 'myhost.domain.tld', '192.168.1.1')
+ def testSettingExistingIp(self):
+ SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
+ ['myhost'])
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- '4dc04c0acdd247175e0b980c6beea822')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1\tmyhost.domain.tld myhost\n")
def testRemovingExistingHost(self):
- tmpname = self.writeTestFile()
- try:
- RemoveEtcHostsEntry(tmpname, 'router')
+ RemoveEtcHostsEntry(self.tmpname, 'router')
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- '7d1e7a559eedbc25e0dff67d33ccac84')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1 gw\n")
def testRemovingSingleExistingHost(self):
- tmpname = self.writeTestFile()
- try:
- RemoveEtcHostsEntry(tmpname, 'localhost')
+ RemoveEtcHostsEntry(self.tmpname, 'localhost')
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- 'ec4e4589b56f82fdb88db5675de011b1')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "192.168.1.1 router gw\n")
def testRemovingNonExistingHost(self):
- tmpname = self.writeTestFile()
- try:
- RemoveEtcHostsEntry(tmpname, 'myhost')
+ RemoveEtcHostsEntry(self.tmpname, 'myhost')
- f = open(tmpname, 'r')
- try:
- self.assertEqual(md5.new(f.read(8192)).hexdigest(),
- 'aa005bddc6d9ee399c296953f194929e')
- finally:
- f.close()
- finally:
- os.unlink(tmpname)
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1 router gw\n")
+
+ def testRemovingAlias(self):
+ RemoveEtcHostsEntry(self.tmpname, 'gw')
+
+ self.assertFileContent(self.tmpname,
+ "# This is a test file for /etc/hosts\n"
+ "127.0.0.1\tlocalhost\n"
+ "192.168.1.1 router\n")
class TestShellQuoting(unittest.TestCase):