from ganeti import bdev
from ganeti import errors
+from ganeti import constants
import testutils
"remote_addr" not in result),
"Should not find network info")
+ def testBarriersOptions(self):
+ """Test class method that generates drbdsetup options for disk barriers"""
+ # Tests that should fail because of wrong version/options combinations
+ should_fail = [
+ (8, 0, 12, "bfd", True),
+ (8, 0, 12, "fd", False),
+ (8, 0, 12, "b", True),
+ (8, 2, 7, "bfd", True),
+ (8, 2, 7, "b", True)
+ ]
+
+ for vmaj, vmin, vrel, opts, meta in should_fail:
+ self.assertRaises(errors.BlockDeviceError,
+ bdev.DRBD8._ComputeDiskBarrierArgs,
+ vmaj, vmin, vrel, opts, meta)
+
+ # get the valid options from the frozenset(frozenset()) in constants.
+ valid_options = [list(x)[0] for x in constants.DRBD_VALID_BARRIER_OPT]
+
+ # Versions that do not support anything
+ for vmaj, vmin, vrel in ((8, 0, 0), (8, 0, 11), (8, 2, 6)):
+ for opts in valid_options:
+ self.assertRaises(errors.BlockDeviceError,
+ bdev.DRBD8._ComputeDiskBarrierArgs,
+ vmaj, vmin, vrel, opts, True)
+
+ # Versions with partial support (testing only options that are supported)
+ tests = [
+ (8, 0, 12, "n", False, []),
+ (8, 0, 12, "n", True, ["--no-md-flushes"]),
+ (8, 2, 7, "n", False, []),
+ (8, 2, 7, "fd", False, ["--no-disk-flushes", "--no-disk-drain"]),
+ (8, 0, 12, "n", True, ["--no-md-flushes"]),
+ ]
+
+ # Versions that support everything
+ for vmaj, vmin, vrel in ((8, 3, 0), (8, 3, 12)):
+ tests.append((vmaj, vmin, vrel, "bfd", True,
+ ["--no-disk-barrier", "--no-disk-drain",
+ "--no-disk-flushes", "--no-md-flushes"]))
+ tests.append((vmaj, vmin, vrel, "n", False, []))
+ tests.append((vmaj, vmin, vrel, "b", True,
+ ["--no-disk-barrier", "--no-md-flushes"]))
+ tests.append((vmaj, vmin, vrel, "fd", False,
+ ["--no-disk-flushes", "--no-disk-drain"]))
+ tests.append((vmaj, vmin, vrel, "n", True, ["--no-md-flushes"]))
+
+ # Test execution
+ for test in tests:
+ vmaj, vmin, vrel, disabled_barriers, disable_meta_flush, expected = test
+ args = \
+ bdev.DRBD8._ComputeDiskBarrierArgs(vmaj, vmin, vrel,
+ disabled_barriers,
+ disable_meta_flush)
+ self.failUnless(set(args) == set(expected),
+ "For test %s, got wrong results %s" % (test, args))
+
+ # Unsupported or invalid versions
+ for vmaj, vmin, vrel in ((0, 7, 25), (9, 0, 0), (7, 0, 0), (8, 4, 0)):
+ self.assertRaises(errors.BlockDeviceError,
+ bdev.DRBD8._ComputeDiskBarrierArgs,
+ vmaj, vmin, vrel, "n", True)
+
+ # Invalid options
+ for option in ("", "c", "whatever", "nbdfc", "nf"):
+ self.assertRaises(errors.BlockDeviceError,
+ bdev.DRBD8._ComputeDiskBarrierArgs,
+ 8, 3, 11, option, True)
+
class TestDRBD8Status(testutils.GanetiTestCase):
"""Testing case for DRBD8 /proc status"""
proc_data = self._TestDataFilename("proc_drbd8.txt")
proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
proc83_data = self._TestDataFilename("proc_drbd83.txt")
+ proc83_sync_data = self._TestDataFilename("proc_drbd83_sync.txt")
+ proc83_sync_krnl_data = \
+ self._TestDataFilename("proc_drbd83_sync_krnl2.6.39.txt")
self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
+ self.proc83_sync_data = bdev.DRBD8._GetProcData(filename=proc83_sync_data)
+ self.proc83_sync_krnl_data = \
+ bdev.DRBD8._GetProcData(filename=proc83_sync_krnl_data)
self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
+ self.mass83_sync_data = bdev.DRBD8._MassageProcData(self.proc83_sync_data)
+ self.mass83_sync_krnl_data = \
+ bdev.DRBD8._MassageProcData(self.proc83_sync_krnl_data)
def testIOErrors(self):
"""Test handling of errors while reading the proc file."""
stats.rrole == 'Unknown' and
stats.is_disk_uptodate)
+ def testDRBD83SyncFine(self):
+ stats = bdev.DRBD8Status(self.mass83_sync_data[3])
+ self.failUnless(stats.is_in_resync)
+ self.failUnless(stats.sync_percent is not None)
+
+ def testDRBD83SyncBroken(self):
+ stats = bdev.DRBD8Status(self.mass83_sync_krnl_data[3])
+ self.failUnless(stats.is_in_resync)
+ self.failUnless(stats.sync_percent is not None)
+
if __name__ == '__main__':
testutils.GanetiTestProgram()