4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the bdev module"""
29 from ganeti import bdev
30 from ganeti import errors
33 class TestDRBD8Runner(testutils.GanetiTestCase):
34 """Testing case for DRBD8"""
37 def _has_disk(data, dname, mname):
38 """Check local disk corectness"""
40 "local_dev" in data and
41 data["local_dev"] == dname and
42 "meta_dev" in data and
43 data["meta_dev"] == mname and
44 "meta_index" in data and
45 data["meta_index"] == 0
50 def _has_net(data, local, remote):
51 """Check network connection parameters"""
53 "local_addr" in data and
54 data["local_addr"] == local and
55 "remote_addr" in data and
56 data["remote_addr"] == remote
60 def testParserCreation(self):
61 """Test drbdsetup show parser creation"""
62 bdev.DRBD8._GetShowParser()
64 def testParserBoth80(self):
65 """Test drbdsetup show parser for disk and network"""
66 data = self._ReadTestData("bdev-both.txt")
67 result = bdev.DRBD8._GetDevInfo(data)
68 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
69 "/dev/xenvg/test.meta"),
70 "Wrong local disk info")
71 self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
72 ("192.168.1.2", 11000)),
73 "Wrong network info (8.0.x)")
75 def testParserBoth83(self):
76 """Test drbdsetup show parser for disk and network"""
77 data = self._ReadTestData("bdev-8.3-both.txt")
78 result = bdev.DRBD8._GetDevInfo(data)
79 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
80 "/dev/xenvg/test.meta"),
81 "Wrong local disk info")
82 self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
83 ("192.168.1.2", 11000)),
84 "Wrong network info (8.2.x)")
86 def testParserNet(self):
87 """Test drbdsetup show parser for disk and network"""
88 data = self._ReadTestData("bdev-net.txt")
89 result = bdev.DRBD8._GetDevInfo(data)
90 self.failUnless(("local_dev" not in result and
91 "meta_dev" not in result and
92 "meta_index" not in result),
93 "Should not find local disk info")
94 self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
95 ("192.168.1.2", 11002)),
98 def testParserDisk(self):
99 """Test drbdsetup show parser for disk and network"""
100 data = self._ReadTestData("bdev-disk.txt")
101 result = bdev.DRBD8._GetDevInfo(data)
102 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
103 "/dev/xenvg/test.meta"),
104 "Wrong local disk info")
105 self.failUnless(("local_addr" not in result and
106 "remote_addr" not in result),
107 "Should not find network info")
110 class TestDRBD8Status(testutils.GanetiTestCase):
111 """Testing case for DRBD8 /proc status"""
114 """Read in txt data"""
115 testutils.GanetiTestCase.setUp(self)
116 proc_data = self._TestDataFilename("proc_drbd8.txt")
117 proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
118 proc83_data = self._TestDataFilename("proc_drbd83.txt")
119 self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
120 self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
121 self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
122 self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
123 self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
124 self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
126 def testIOErrors(self):
127 """Test handling of errors while reading the proc file."""
128 temp_file = self._CreateTempFile()
130 self.failUnlessRaises(errors.BlockDeviceError,
131 bdev.DRBD8._GetProcData, filename=temp_file)
133 def testMinorNotFound(self):
134 """Test not-found-minor in /proc"""
135 self.failUnless(9 not in self.mass_data)
136 self.failUnless(9 not in self.mass83_data)
137 self.failUnless(3 not in self.mass80e_data)
139 def testLineNotMatch(self):
140 """Test wrong line passed to DRBD8Status"""
141 self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
143 def testMinor0(self):
144 """Test connected, primary device"""
145 for data in [self.mass_data, self.mass83_data]:
146 stats = bdev.DRBD8Status(data[0])
147 self.failUnless(stats.is_in_use)
148 self.failUnless(stats.is_connected and stats.is_primary and
149 stats.peer_secondary and stats.is_disk_uptodate)
151 def testMinor1(self):
152 """Test connected, secondary device"""
153 for data in [self.mass_data, self.mass83_data]:
154 stats = bdev.DRBD8Status(data[1])
155 self.failUnless(stats.is_in_use)
156 self.failUnless(stats.is_connected and stats.is_secondary and
157 stats.peer_primary and stats.is_disk_uptodate)
159 def testMinor2(self):
160 """Test unconfigured device"""
161 for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
162 stats = bdev.DRBD8Status(data[2])
163 self.failIf(stats.is_in_use)
165 def testMinor4(self):
166 """Test WFconn device"""
167 for data in [self.mass_data, self.mass83_data]:
168 stats = bdev.DRBD8Status(data[4])
169 self.failUnless(stats.is_in_use)
170 self.failUnless(stats.is_wfconn and stats.is_primary and
171 stats.rrole == 'Unknown' and
172 stats.is_disk_uptodate)
174 def testMinor6(self):
175 """Test diskless device"""
176 for data in [self.mass_data, self.mass83_data]:
177 stats = bdev.DRBD8Status(data[6])
178 self.failUnless(stats.is_in_use)
179 self.failUnless(stats.is_connected and stats.is_secondary and
180 stats.peer_primary and stats.is_diskless)
182 def testMinor8(self):
183 """Test standalone device"""
184 for data in [self.mass_data, self.mass83_data]:
185 stats = bdev.DRBD8Status(data[8])
186 self.failUnless(stats.is_in_use)
187 self.failUnless(stats.is_standalone and
188 stats.rrole == 'Unknown' and
189 stats.is_disk_uptodate)
191 if __name__ == '__main__':