4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the bdev module"""
28 from ganeti import bdev
29 from ganeti import errors
34 class TestDRBD8Runner(testutils.GanetiTestCase):
35 """Testing case for DRBD8"""
38 def _has_disk(data, dname, mname):
39 """Check local disk corectness"""
41 "local_dev" in data and
42 data["local_dev"] == dname and
43 "meta_dev" in data and
44 data["meta_dev"] == mname and
45 "meta_index" in data and
46 data["meta_index"] == 0
51 def _has_net(data, local, remote):
52 """Check network connection parameters"""
54 "local_addr" in data and
55 data["local_addr"] == local and
56 "remote_addr" in data and
57 data["remote_addr"] == remote
61 def testParserCreation(self):
62 """Test drbdsetup show parser creation"""
63 bdev.DRBD8._GetShowParser()
65 def testParserBoth80(self):
66 """Test drbdsetup show parser for disk and network"""
67 data = self._ReadTestData("bdev-both.txt")
68 result = bdev.DRBD8._GetDevInfo(data)
69 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
70 "/dev/xenvg/test.meta"),
71 "Wrong local disk info")
72 self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
73 ("192.168.1.2", 11000)),
74 "Wrong network info (8.0.x)")
76 def testParserBoth83(self):
77 """Test drbdsetup show parser for disk and network"""
78 data = self._ReadTestData("bdev-8.3-both.txt")
79 result = bdev.DRBD8._GetDevInfo(data)
80 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
81 "/dev/xenvg/test.meta"),
82 "Wrong local disk info")
83 self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
84 ("192.168.1.2", 11000)),
85 "Wrong network info (8.2.x)")
87 def testParserNet(self):
88 """Test drbdsetup show parser for disk and network"""
89 data = self._ReadTestData("bdev-net.txt")
90 result = bdev.DRBD8._GetDevInfo(data)
91 self.failUnless(("local_dev" not in result and
92 "meta_dev" not in result and
93 "meta_index" not in result),
94 "Should not find local disk info")
95 self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
96 ("192.168.1.2", 11002)),
99 def testParserDisk(self):
100 """Test drbdsetup show parser for disk and network"""
101 data = self._ReadTestData("bdev-disk.txt")
102 result = bdev.DRBD8._GetDevInfo(data)
103 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
104 "/dev/xenvg/test.meta"),
105 "Wrong local disk info")
106 self.failUnless(("local_addr" not in result and
107 "remote_addr" not in result),
108 "Should not find network info")
111 class TestDRBD8Status(testutils.GanetiTestCase):
112 """Testing case for DRBD8 /proc status"""
115 """Read in txt data"""
116 testutils.GanetiTestCase.setUp(self)
117 proc_data = self._TestDataFilename("proc_drbd8.txt")
118 proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
119 proc83_data = self._TestDataFilename("proc_drbd83.txt")
120 self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
121 self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
122 self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
123 self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
124 self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
125 self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
127 def testIOErrors(self):
128 """Test handling of errors while reading the proc file."""
129 temp_file = self._CreateTempFile()
131 self.failUnlessRaises(errors.BlockDeviceError,
132 bdev.DRBD8._GetProcData, filename=temp_file)
134 def testMinorNotFound(self):
135 """Test not-found-minor in /proc"""
136 self.failUnless(9 not in self.mass_data)
137 self.failUnless(9 not in self.mass83_data)
138 self.failUnless(3 not in self.mass80e_data)
140 def testLineNotMatch(self):
141 """Test wrong line passed to DRBD8Status"""
142 self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
144 def testMinor0(self):
145 """Test connected, primary device"""
146 for data in [self.mass_data, self.mass83_data]:
147 stats = bdev.DRBD8Status(data[0])
148 self.failUnless(stats.is_in_use)
149 self.failUnless(stats.is_connected and stats.is_primary and
150 stats.peer_secondary and stats.is_disk_uptodate)
152 def testMinor1(self):
153 """Test connected, secondary device"""
154 for data in [self.mass_data, self.mass83_data]:
155 stats = bdev.DRBD8Status(data[1])
156 self.failUnless(stats.is_in_use)
157 self.failUnless(stats.is_connected and stats.is_secondary and
158 stats.peer_primary and stats.is_disk_uptodate)
160 def testMinor2(self):
161 """Test unconfigured device"""
162 for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
163 stats = bdev.DRBD8Status(data[2])
164 self.failIf(stats.is_in_use)
166 def testMinor4(self):
167 """Test WFconn device"""
168 for data in [self.mass_data, self.mass83_data]:
169 stats = bdev.DRBD8Status(data[4])
170 self.failUnless(stats.is_in_use)
171 self.failUnless(stats.is_wfconn and stats.is_primary and
172 stats.rrole == 'Unknown' and
173 stats.is_disk_uptodate)
175 def testMinor6(self):
176 """Test diskless device"""
177 for data in [self.mass_data, self.mass83_data]:
178 stats = bdev.DRBD8Status(data[6])
179 self.failUnless(stats.is_in_use)
180 self.failUnless(stats.is_connected and stats.is_secondary and
181 stats.peer_primary and stats.is_diskless)
183 def testMinor8(self):
184 """Test standalone device"""
185 for data in [self.mass_data, self.mass83_data]:
186 stats = bdev.DRBD8Status(data[8])
187 self.failUnless(stats.is_in_use)
188 self.failUnless(stats.is_standalone and
189 stats.rrole == 'Unknown' and
190 stats.is_disk_uptodate)
192 if __name__ == '__main__':
193 testutils.GanetiTestProgram()