4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the bdev module"""
28 from ganeti import bdev
29 from ganeti import errors
34 class TestBaseDRBD(testutils.GanetiTestCase):
35 def testGetVersion(self):
37 ["version: 8.0.12 (api:76/proto:86-91)"],
38 ["version: 8.2.7 (api:88/proto:0-100)"],
39 ["version: 8.3.7.49 (api:188/proto:13-191)"],
67 for d,r in zip(data, result):
68 self.assertEqual(bdev.BaseDRBD._GetVersion(d), r)
71 class TestDRBD8Runner(testutils.GanetiTestCase):
72 """Testing case for DRBD8"""
75 def _has_disk(data, dname, mname):
76 """Check local disk corectness"""
78 "local_dev" in data and
79 data["local_dev"] == dname and
80 "meta_dev" in data and
81 data["meta_dev"] == mname and
82 "meta_index" in data and
83 data["meta_index"] == 0
88 def _has_net(data, local, remote):
89 """Check network connection parameters"""
91 "local_addr" in data and
92 data["local_addr"] == local and
93 "remote_addr" in data and
94 data["remote_addr"] == remote
98 def testParserCreation(self):
99 """Test drbdsetup show parser creation"""
100 bdev.DRBD8._GetShowParser()
102 def testParser80(self):
103 """Test drbdsetup show parser for disk and network version 8.0"""
104 data = self._ReadTestData("bdev-drbd-8.0.txt")
105 result = bdev.DRBD8._GetDevInfo(data)
106 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
107 "/dev/xenvg/test.meta"),
108 "Wrong local disk info")
109 self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
110 ("192.0.2.2", 11000)),
111 "Wrong network info (8.0.x)")
113 def testParser83(self):
114 """Test drbdsetup show parser for disk and network version 8.3"""
115 data = self._ReadTestData("bdev-drbd-8.3.txt")
116 result = bdev.DRBD8._GetDevInfo(data)
117 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
118 "/dev/xenvg/test.meta"),
119 "Wrong local disk info")
120 self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
121 ("192.0.2.2", 11000)),
122 "Wrong network info (8.0.x)")
124 def testParserNetIP4(self):
125 """Test drbdsetup show parser for IPv4 network"""
126 data = self._ReadTestData("bdev-drbd-net-ip4.txt")
127 result = bdev.DRBD8._GetDevInfo(data)
128 self.failUnless(("local_dev" not in result and
129 "meta_dev" not in result and
130 "meta_index" not in result),
131 "Should not find local disk info")
132 self.failUnless(self._has_net(result, ("192.0.2.1", 11002),
133 ("192.0.2.2", 11002)),
134 "Wrong network info (IPv4)")
136 def testParserNetIP6(self):
137 """Test drbdsetup show parser for IPv6 network"""
138 data = self._ReadTestData("bdev-drbd-net-ip6.txt")
139 result = bdev.DRBD8._GetDevInfo(data)
140 self.failUnless(("local_dev" not in result and
141 "meta_dev" not in result and
142 "meta_index" not in result),
143 "Should not find local disk info")
144 self.failUnless(self._has_net(result, ("2001:db8:65::1", 11048),
145 ("2001:db8:66::1", 11048)),
146 "Wrong network info (IPv6)")
148 def testParserDisk(self):
149 """Test drbdsetup show parser for disk"""
150 data = self._ReadTestData("bdev-drbd-disk.txt")
151 result = bdev.DRBD8._GetDevInfo(data)
152 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
153 "/dev/xenvg/test.meta"),
154 "Wrong local disk info")
155 self.failUnless(("local_addr" not in result and
156 "remote_addr" not in result),
157 "Should not find network info")
160 class TestDRBD8Status(testutils.GanetiTestCase):
161 """Testing case for DRBD8 /proc status"""
164 """Read in txt data"""
165 testutils.GanetiTestCase.setUp(self)
166 proc_data = self._TestDataFilename("proc_drbd8.txt")
167 proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
168 proc83_data = self._TestDataFilename("proc_drbd83.txt")
169 self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
170 self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
171 self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
172 self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
173 self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
174 self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
176 def testIOErrors(self):
177 """Test handling of errors while reading the proc file."""
178 temp_file = self._CreateTempFile()
180 self.failUnlessRaises(errors.BlockDeviceError,
181 bdev.DRBD8._GetProcData, filename=temp_file)
183 def testHelper(self):
184 """Test reading usermode_helper in /sys."""
185 sys_drbd_helper = self._TestDataFilename("sys_drbd_usermode_helper.txt")
186 drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
187 self.failUnlessEqual(drbd_helper, "/bin/true")
189 def testHelperIOErrors(self):
190 """Test handling of errors while reading usermode_helper in /sys."""
191 temp_file = self._CreateTempFile()
193 self.failUnlessRaises(errors.BlockDeviceError,
194 bdev.DRBD8.GetUsermodeHelper, filename=temp_file)
196 def testMinorNotFound(self):
197 """Test not-found-minor in /proc"""
198 self.failUnless(9 not in self.mass_data)
199 self.failUnless(9 not in self.mass83_data)
200 self.failUnless(3 not in self.mass80e_data)
202 def testLineNotMatch(self):
203 """Test wrong line passed to DRBD8Status"""
204 self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
206 def testMinor0(self):
207 """Test connected, primary device"""
208 for data in [self.mass_data, self.mass83_data]:
209 stats = bdev.DRBD8Status(data[0])
210 self.failUnless(stats.is_in_use)
211 self.failUnless(stats.is_connected and stats.is_primary and
212 stats.peer_secondary and stats.is_disk_uptodate)
214 def testMinor1(self):
215 """Test connected, secondary device"""
216 for data in [self.mass_data, self.mass83_data]:
217 stats = bdev.DRBD8Status(data[1])
218 self.failUnless(stats.is_in_use)
219 self.failUnless(stats.is_connected and stats.is_secondary and
220 stats.peer_primary and stats.is_disk_uptodate)
222 def testMinor2(self):
223 """Test unconfigured device"""
224 for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
225 stats = bdev.DRBD8Status(data[2])
226 self.failIf(stats.is_in_use)
228 def testMinor4(self):
229 """Test WFconn device"""
230 for data in [self.mass_data, self.mass83_data]:
231 stats = bdev.DRBD8Status(data[4])
232 self.failUnless(stats.is_in_use)
233 self.failUnless(stats.is_wfconn and stats.is_primary and
234 stats.rrole == 'Unknown' and
235 stats.is_disk_uptodate)
237 def testMinor6(self):
238 """Test diskless device"""
239 for data in [self.mass_data, self.mass83_data]:
240 stats = bdev.DRBD8Status(data[6])
241 self.failUnless(stats.is_in_use)
242 self.failUnless(stats.is_connected and stats.is_secondary and
243 stats.peer_primary and stats.is_diskless)
245 def testMinor8(self):
246 """Test standalone device"""
247 for data in [self.mass_data, self.mass83_data]:
248 stats = bdev.DRBD8Status(data[8])
249 self.failUnless(stats.is_in_use)
250 self.failUnless(stats.is_standalone and
251 stats.rrole == 'Unknown' and
252 stats.is_disk_uptodate)
254 if __name__ == '__main__':
255 testutils.GanetiTestProgram()