#!/usr/bin/python
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import os
import unittest
-import testutils
from ganeti import bdev
from ganeti import errors
+import testutils
+
+
+class TestBaseDRBD(testutils.GanetiTestCase):
+ def testGetVersion(self):
+ data = [
+ ["version: 8.0.12 (api:76/proto:86-91)"],
+ ["version: 8.2.7 (api:88/proto:0-100)"],
+ ["version: 8.3.7.49 (api:188/proto:13-191)"],
+ ]
+ result = [
+ {
+ "k_major": 8,
+ "k_minor": 0,
+ "k_point": 12,
+ "api": 76,
+ "proto": 86,
+ "proto2": "91",
+ },
+ {
+ "k_major": 8,
+ "k_minor": 2,
+ "k_point": 7,
+ "api": 88,
+ "proto": 0,
+ "proto2": "100",
+ },
+ {
+ "k_major": 8,
+ "k_minor": 3,
+ "k_point": 7,
+ "api": 188,
+ "proto": 13,
+ "proto2": "191",
+ }
+ ]
+ for d,r in zip(data, result):
+ self.assertEqual(bdev.BaseDRBD._GetVersion(d), r)
+
class TestDRBD8Runner(testutils.GanetiTestCase):
"""Testing case for DRBD8"""
"""Test drbdsetup show parser creation"""
bdev.DRBD8._GetShowParser()
- def testParserBoth80(self):
- """Test drbdsetup show parser for disk and network"""
- data = self._ReadTestData("bdev-both.txt")
+ def testParser80(self):
+ """Test drbdsetup show parser for disk and network version 8.0"""
+ data = self._ReadTestData("bdev-drbd-8.0.txt")
result = bdev.DRBD8._GetDevInfo(data)
self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
"/dev/xenvg/test.meta"),
"Wrong local disk info")
- self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
- ("192.168.1.2", 11000)),
+ self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
+ ("192.0.2.2", 11000)),
"Wrong network info (8.0.x)")
- def testParserBoth83(self):
- """Test drbdsetup show parser for disk and network"""
- data = self._ReadTestData("bdev-8.3-both.txt")
+ def testParser83(self):
+ """Test drbdsetup show parser for disk and network version 8.3"""
+ data = self._ReadTestData("bdev-drbd-8.3.txt")
result = bdev.DRBD8._GetDevInfo(data)
self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
"/dev/xenvg/test.meta"),
"Wrong local disk info")
- self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
- ("192.168.1.2", 11000)),
- "Wrong network info (8.2.x)")
+ self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
+ ("192.0.2.2", 11000)),
+ "Wrong network info (8.0.x)")
+
+ def testParserNetIP4(self):
+ """Test drbdsetup show parser for IPv4 network"""
+ data = self._ReadTestData("bdev-drbd-net-ip4.txt")
+ result = bdev.DRBD8._GetDevInfo(data)
+ self.failUnless(("local_dev" not in result and
+ "meta_dev" not in result and
+ "meta_index" not in result),
+ "Should not find local disk info")
+ self.failUnless(self._has_net(result, ("192.0.2.1", 11002),
+ ("192.0.2.2", 11002)),
+ "Wrong network info (IPv4)")
- def testParserNet(self):
- """Test drbdsetup show parser for disk and network"""
- data = self._ReadTestData("bdev-net.txt")
+ def testParserNetIP6(self):
+ """Test drbdsetup show parser for IPv6 network"""
+ data = self._ReadTestData("bdev-drbd-net-ip6.txt")
result = bdev.DRBD8._GetDevInfo(data)
self.failUnless(("local_dev" not in result and
"meta_dev" not in result and
"meta_index" not in result),
"Should not find local disk info")
- self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
- ("192.168.1.2", 11002)),
- "Wrong network info")
+ self.failUnless(self._has_net(result, ("2001:db8:65::1", 11048),
+ ("2001:db8:66::1", 11048)),
+ "Wrong network info (IPv6)")
def testParserDisk(self):
- """Test drbdsetup show parser for disk and network"""
- data = self._ReadTestData("bdev-disk.txt")
+ """Test drbdsetup show parser for disk"""
+ data = self._ReadTestData("bdev-drbd-disk.txt")
result = bdev.DRBD8._GetDevInfo(data)
self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
"/dev/xenvg/test.meta"),
"""Read in txt data"""
testutils.GanetiTestCase.setUp(self)
proc_data = self._TestDataFilename("proc_drbd8.txt")
+ proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
proc83_data = self._TestDataFilename("proc_drbd83.txt")
+ proc83_sync_data = self._TestDataFilename("proc_drbd83_sync.txt")
+ proc83_sync_krnl_data = \
+ self._TestDataFilename("proc_drbd83_sync_krnl2.6.39.txt")
self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
+ self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
+ self.proc83_sync_data = bdev.DRBD8._GetProcData(filename=proc83_sync_data)
+ self.proc83_sync_krnl_data = \
+ bdev.DRBD8._GetProcData(filename=proc83_sync_krnl_data)
self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
+ self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
+ self.mass83_sync_data = bdev.DRBD8._MassageProcData(self.proc83_sync_data)
+ self.mass83_sync_krnl_data = \
+ bdev.DRBD8._MassageProcData(self.proc83_sync_krnl_data)
def testIOErrors(self):
"""Test handling of errors while reading the proc file."""
self.failUnlessRaises(errors.BlockDeviceError,
bdev.DRBD8._GetProcData, filename=temp_file)
+ def testHelper(self):
+ """Test reading usermode_helper in /sys."""
+ sys_drbd_helper = self._TestDataFilename("sys_drbd_usermode_helper.txt")
+ drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
+ self.failUnlessEqual(drbd_helper, "/bin/true")
+
+ def testHelperIOErrors(self):
+ """Test handling of errors while reading usermode_helper in /sys."""
+ temp_file = self._CreateTempFile()
+ os.unlink(temp_file)
+ self.failUnlessRaises(errors.BlockDeviceError,
+ bdev.DRBD8.GetUsermodeHelper, filename=temp_file)
+
def testMinorNotFound(self):
"""Test not-found-minor in /proc"""
self.failUnless(9 not in self.mass_data)
self.failUnless(9 not in self.mass83_data)
+ self.failUnless(3 not in self.mass80e_data)
def testLineNotMatch(self):
"""Test wrong line passed to DRBD8Status"""
def testMinor2(self):
"""Test unconfigured device"""
- for data in [self.mass_data, self.mass83_data]:
+ for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
stats = bdev.DRBD8Status(data[2])
self.failIf(stats.is_in_use)
stats.rrole == 'Unknown' and
stats.is_disk_uptodate)
+ def testDRBD83SyncFine(self):
+ stats = bdev.DRBD8Status(self.mass83_sync_data[3])
+ self.failUnless(stats.is_in_resync)
+ self.failUnless(stats.sync_percent is not None)
+
+ def testDRBD83SyncBroken(self):
+ stats = bdev.DRBD8Status(self.mass83_sync_krnl_data[3])
+ self.failUnless(stats.is_in_resync)
+ self.failUnless(stats.sync_percent is not None)
+
if __name__ == '__main__':
- unittest.main()
+ testutils.GanetiTestProgram()