4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the bdev module"""
28 from ganeti import bdev
29 from ganeti import errors
34 class TestBaseDRBD(testutils.GanetiTestCase):
35 def testGetVersion(self):
37 ["version: 8.0.12 (api:76/proto:86-91)"],
38 ["version: 8.2.7 (api:88/proto:0-100)"],
39 ["version: 8.3.7.49 (api:188/proto:13-191)"],
67 for d,r in zip(data, result):
68 self.assertEqual(bdev.BaseDRBD._GetVersion(d), r)
71 class TestDRBD8Runner(testutils.GanetiTestCase):
72 """Testing case for DRBD8"""
75 def _has_disk(data, dname, mname):
76 """Check local disk corectness"""
78 "local_dev" in data and
79 data["local_dev"] == dname and
80 "meta_dev" in data and
81 data["meta_dev"] == mname and
82 "meta_index" in data and
83 data["meta_index"] == 0
88 def _has_net(data, local, remote):
89 """Check network connection parameters"""
91 "local_addr" in data and
92 data["local_addr"] == local and
93 "remote_addr" in data and
94 data["remote_addr"] == remote
98 def testParserCreation(self):
99 """Test drbdsetup show parser creation"""
100 bdev.DRBD8._GetShowParser()
102 def testParser80(self):
103 """Test drbdsetup show parser for disk and network version 8.0"""
104 data = self._ReadTestData("bdev-drbd-8.0.txt")
105 result = bdev.DRBD8._GetDevInfo(data)
106 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
107 "/dev/xenvg/test.meta"),
108 "Wrong local disk info")
109 self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
110 ("192.0.2.2", 11000)),
111 "Wrong network info (8.0.x)")
113 def testParser83(self):
114 """Test drbdsetup show parser for disk and network version 8.3"""
115 data = self._ReadTestData("bdev-drbd-8.3.txt")
116 result = bdev.DRBD8._GetDevInfo(data)
117 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
118 "/dev/xenvg/test.meta"),
119 "Wrong local disk info")
120 self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
121 ("192.0.2.2", 11000)),
122 "Wrong network info (8.0.x)")
124 def testParserNetIP4(self):
125 """Test drbdsetup show parser for IPv4 network"""
126 data = self._ReadTestData("bdev-drbd-net-ip4.txt")
127 result = bdev.DRBD8._GetDevInfo(data)
128 self.failUnless(("local_dev" not in result and
129 "meta_dev" not in result and
130 "meta_index" not in result),
131 "Should not find local disk info")
132 self.failUnless(self._has_net(result, ("192.0.2.1", 11002),
133 ("192.0.2.2", 11002)),
134 "Wrong network info (IPv4)")
136 def testParserNetIP6(self):
137 """Test drbdsetup show parser for IPv6 network"""
138 data = self._ReadTestData("bdev-drbd-net-ip6.txt")
139 result = bdev.DRBD8._GetDevInfo(data)
140 self.failUnless(("local_dev" not in result and
141 "meta_dev" not in result and
142 "meta_index" not in result),
143 "Should not find local disk info")
144 self.failUnless(self._has_net(result, ("2001:db8:65::1", 11048),
145 ("2001:db8:66::1", 11048)),
146 "Wrong network info (IPv6)")
148 def testParserDisk(self):
149 """Test drbdsetup show parser for disk"""
150 data = self._ReadTestData("bdev-drbd-disk.txt")
151 result = bdev.DRBD8._GetDevInfo(data)
152 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
153 "/dev/xenvg/test.meta"),
154 "Wrong local disk info")
155 self.failUnless(("local_addr" not in result and
156 "remote_addr" not in result),
157 "Should not find network info")
160 class TestDRBD8Status(testutils.GanetiTestCase):
161 """Testing case for DRBD8 /proc status"""
164 """Read in txt data"""
165 testutils.GanetiTestCase.setUp(self)
166 proc_data = self._TestDataFilename("proc_drbd8.txt")
167 proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
168 proc83_data = self._TestDataFilename("proc_drbd83.txt")
169 proc83_sync_data = self._TestDataFilename("proc_drbd83_sync.txt")
170 proc83_sync_krnl_data = \
171 self._TestDataFilename("proc_drbd83_sync_krnl2.6.39.txt")
172 self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
173 self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
174 self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
175 self.proc83_sync_data = bdev.DRBD8._GetProcData(filename=proc83_sync_data)
176 self.proc83_sync_krnl_data = \
177 bdev.DRBD8._GetProcData(filename=proc83_sync_krnl_data)
178 self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
179 self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
180 self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
181 self.mass83_sync_data = bdev.DRBD8._MassageProcData(self.proc83_sync_data)
182 self.mass83_sync_krnl_data = \
183 bdev.DRBD8._MassageProcData(self.proc83_sync_krnl_data)
185 def testIOErrors(self):
186 """Test handling of errors while reading the proc file."""
187 temp_file = self._CreateTempFile()
189 self.failUnlessRaises(errors.BlockDeviceError,
190 bdev.DRBD8._GetProcData, filename=temp_file)
192 def testHelper(self):
193 """Test reading usermode_helper in /sys."""
194 sys_drbd_helper = self._TestDataFilename("sys_drbd_usermode_helper.txt")
195 drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
196 self.failUnlessEqual(drbd_helper, "/bin/true")
198 def testHelperIOErrors(self):
199 """Test handling of errors while reading usermode_helper in /sys."""
200 temp_file = self._CreateTempFile()
202 self.failUnlessRaises(errors.BlockDeviceError,
203 bdev.DRBD8.GetUsermodeHelper, filename=temp_file)
205 def testMinorNotFound(self):
206 """Test not-found-minor in /proc"""
207 self.failUnless(9 not in self.mass_data)
208 self.failUnless(9 not in self.mass83_data)
209 self.failUnless(3 not in self.mass80e_data)
211 def testLineNotMatch(self):
212 """Test wrong line passed to DRBD8Status"""
213 self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
215 def testMinor0(self):
216 """Test connected, primary device"""
217 for data in [self.mass_data, self.mass83_data]:
218 stats = bdev.DRBD8Status(data[0])
219 self.failUnless(stats.is_in_use)
220 self.failUnless(stats.is_connected and stats.is_primary and
221 stats.peer_secondary and stats.is_disk_uptodate)
223 def testMinor1(self):
224 """Test connected, secondary device"""
225 for data in [self.mass_data, self.mass83_data]:
226 stats = bdev.DRBD8Status(data[1])
227 self.failUnless(stats.is_in_use)
228 self.failUnless(stats.is_connected and stats.is_secondary and
229 stats.peer_primary and stats.is_disk_uptodate)
231 def testMinor2(self):
232 """Test unconfigured device"""
233 for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
234 stats = bdev.DRBD8Status(data[2])
235 self.failIf(stats.is_in_use)
237 def testMinor4(self):
238 """Test WFconn device"""
239 for data in [self.mass_data, self.mass83_data]:
240 stats = bdev.DRBD8Status(data[4])
241 self.failUnless(stats.is_in_use)
242 self.failUnless(stats.is_wfconn and stats.is_primary and
243 stats.rrole == 'Unknown' and
244 stats.is_disk_uptodate)
246 def testMinor6(self):
247 """Test diskless device"""
248 for data in [self.mass_data, self.mass83_data]:
249 stats = bdev.DRBD8Status(data[6])
250 self.failUnless(stats.is_in_use)
251 self.failUnless(stats.is_connected and stats.is_secondary and
252 stats.peer_primary and stats.is_diskless)
254 def testMinor8(self):
255 """Test standalone device"""
256 for data in [self.mass_data, self.mass83_data]:
257 stats = bdev.DRBD8Status(data[8])
258 self.failUnless(stats.is_in_use)
259 self.failUnless(stats.is_standalone and
260 stats.rrole == 'Unknown' and
261 stats.is_disk_uptodate)
263 def testDRBD83SyncFine(self):
264 stats = bdev.DRBD8Status(self.mass83_sync_data[3])
265 self.failUnless(stats.is_in_resync)
266 self.failUnless(stats.sync_percent is not None)
268 def testDRBD83SyncBroken(self):
269 stats = bdev.DRBD8Status(self.mass83_sync_krnl_data[3])
270 self.failUnless(stats.is_in_resync)
271 self.failUnless(stats.sync_percent is not None)
273 if __name__ == '__main__':
274 testutils.GanetiTestProgram()