from ganeti import bdev
from ganeti import errors
+import testutils
-class TestDRBD8Runner(unittest.TestCase):
+
+class TestDRBD8Runner(testutils.GanetiTestCase):
"""Testing case for DRBD8"""
@staticmethod
return retval
@staticmethod
- def _get_contents(name):
- """Returns the contents of a file"""
-
- prefix = os.environ.get("srcdir", None)
- if prefix:
- name = prefix + "/" + name
- fh = open(name, "r")
- try:
- data = fh.read()
- finally:
- fh.close()
- return data
-
-
- @staticmethod
def _has_net(data, local, remote):
"""Check network connection parameters"""
retval = (
"""Test drbdsetup show parser creation"""
bdev.DRBD8._GetShowParser()
- def testParserBoth(self):
+ def testParserBoth80(self):
"""Test drbdsetup show parser for disk and network"""
- data = self._get_contents("data/bdev-both.txt")
+ data = self._ReadTestData("bdev-both.txt")
result = bdev.DRBD8._GetDevInfo(data)
self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
"/dev/xenvg/test.meta"),
"Wrong local disk info")
self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
("192.168.1.2", 11000)),
- "Wrong network info")
+ "Wrong network info (8.0.x)")
+
+ def testParserBoth83(self):
+ """Test drbdsetup show parser for disk and network"""
+ data = self._ReadTestData("bdev-8.3-both.txt")
+ result = bdev.DRBD8._GetDevInfo(data)
+ self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
+ "/dev/xenvg/test.meta"),
+ "Wrong local disk info")
+ self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
+ ("192.168.1.2", 11000)),
+ "Wrong network info (8.2.x)")
def testParserNet(self):
"""Test drbdsetup show parser for disk and network"""
- data = self._get_contents("data/bdev-net.txt")
+ data = self._ReadTestData("bdev-net.txt")
result = bdev.DRBD8._GetDevInfo(data)
self.failUnless(("local_dev" not in result and
"meta_dev" not in result and
def testParserDisk(self):
"""Test drbdsetup show parser for disk and network"""
- data = self._get_contents("data/bdev-disk.txt")
+ data = self._ReadTestData("bdev-disk.txt")
result = bdev.DRBD8._GetDevInfo(data)
self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
"/dev/xenvg/test.meta"),
"Should not find network info")
-class TestDRBD8Status(unittest.TestCase):
+class TestDRBD8Status(testutils.GanetiTestCase):
"""Testing case for DRBD8 /proc status"""
def setUp(self):
"""Read in txt data"""
- self.proc_data = bdev.DRBD8._GetProcData(filename="data/proc_drbd8.txt")
+ testutils.GanetiTestCase.setUp(self)
+ proc_data = self._TestDataFilename("proc_drbd8.txt")
+ proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
+ proc83_data = self._TestDataFilename("proc_drbd83.txt")
+ self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
+ self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
+ self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
+ self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
+ self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
+
+ def testIOErrors(self):
+ """Test handling of errors while reading the proc file."""
+ temp_file = self._CreateTempFile()
+ os.unlink(temp_file)
+ self.failUnlessRaises(errors.BlockDeviceError,
+ bdev.DRBD8._GetProcData, filename=temp_file)
def testMinorNotFound(self):
"""Test not-found-minor in /proc"""
self.failUnless(9 not in self.mass_data)
+ self.failUnless(9 not in self.mass83_data)
+ self.failUnless(3 not in self.mass80e_data)
def testLineNotMatch(self):
"""Test wrong line passed to DRBD8Status"""
def testMinor0(self):
"""Test connected, primary device"""
- stats = bdev.DRBD8Status(self.mass_data[0])
- self.failUnless(stats.is_connected and stats.is_primary and
- stats.peer_secondary and stats.is_disk_uptodate)
+ for data in [self.mass_data, self.mass83_data]:
+ stats = bdev.DRBD8Status(data[0])
+ self.failUnless(stats.is_in_use)
+ self.failUnless(stats.is_connected and stats.is_primary and
+ stats.peer_secondary and stats.is_disk_uptodate)
def testMinor1(self):
"""Test connected, secondary device"""
- stats = bdev.DRBD8Status(self.mass_data[1])
- self.failUnless(stats.is_connected and stats.is_secondary and
- stats.peer_primary and stats.is_disk_uptodate)
+ for data in [self.mass_data, self.mass83_data]:
+ stats = bdev.DRBD8Status(data[1])
+ self.failUnless(stats.is_in_use)
+ self.failUnless(stats.is_connected and stats.is_secondary and
+ stats.peer_primary and stats.is_disk_uptodate)
+
+ def testMinor2(self):
+ """Test unconfigured device"""
+ for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
+ stats = bdev.DRBD8Status(data[2])
+ self.failIf(stats.is_in_use)
def testMinor4(self):
"""Test WFconn device"""
- stats = bdev.DRBD8Status(self.mass_data[4])
- self.failUnless(stats.is_wfconn and stats.is_primary and
- stats.rrole == 'Unknown' and
- stats.is_disk_uptodate)
+ for data in [self.mass_data, self.mass83_data]:
+ stats = bdev.DRBD8Status(data[4])
+ self.failUnless(stats.is_in_use)
+ self.failUnless(stats.is_wfconn and stats.is_primary and
+ stats.rrole == 'Unknown' and
+ stats.is_disk_uptodate)
def testMinor6(self):
"""Test diskless device"""
- stats = bdev.DRBD8Status(self.mass_data[6])
- self.failUnless(stats.is_connected and stats.is_secondary and
- stats.peer_primary and stats.is_diskless)
+ for data in [self.mass_data, self.mass83_data]:
+ stats = bdev.DRBD8Status(data[6])
+ self.failUnless(stats.is_in_use)
+ self.failUnless(stats.is_connected and stats.is_secondary and
+ stats.peer_primary and stats.is_diskless)
def testMinor8(self):
"""Test standalone device"""
- stats = bdev.DRBD8Status(self.mass_data[8])
- self.failUnless(stats.is_standalone and
- stats.rrole == 'Unknown' and
- stats.is_disk_uptodate)
+ for data in [self.mass_data, self.mass83_data]:
+ stats = bdev.DRBD8Status(data[8])
+ self.failUnless(stats.is_in_use)
+ self.failUnless(stats.is_standalone and
+ stats.rrole == 'Unknown' and
+ stats.is_disk_uptodate)
if __name__ == '__main__':
- unittest.main()
+ testutils.GanetiTestProgram()