4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the bdev module"""
29 from ganeti import bdev
30 from ganeti import errors
33 class TestDRBD8Runner(testutils.GanetiTestCase):
34 """Testing case for DRBD8"""
37 def _has_disk(data, dname, mname):
38 """Check local disk corectness"""
40 "local_dev" in data and
41 data["local_dev"] == dname and
42 "meta_dev" in data and
43 data["meta_dev"] == mname and
44 "meta_index" in data and
45 data["meta_index"] == 0
50 def _has_net(data, local, remote):
51 """Check network connection parameters"""
53 "local_addr" in data and
54 data["local_addr"] == local and
55 "remote_addr" in data and
56 data["remote_addr"] == remote
60 def testParserCreation(self):
61 """Test drbdsetup show parser creation"""
62 bdev.DRBD8._GetShowParser()
64 def testParserBoth80(self):
65 """Test drbdsetup show parser for disk and network"""
66 data = self._ReadTestData("bdev-both.txt")
67 result = bdev.DRBD8._GetDevInfo(data)
68 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
69 "/dev/xenvg/test.meta"),
70 "Wrong local disk info")
71 self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
72 ("192.168.1.2", 11000)),
73 "Wrong network info (8.0.x)")
75 def testParserBoth83(self):
76 """Test drbdsetup show parser for disk and network"""
77 data = self._ReadTestData("bdev-8.3-both.txt")
78 result = bdev.DRBD8._GetDevInfo(data)
79 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
80 "/dev/xenvg/test.meta"),
81 "Wrong local disk info")
82 self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
83 ("192.168.1.2", 11000)),
84 "Wrong network info (8.2.x)")
86 def testParserNet(self):
87 """Test drbdsetup show parser for disk and network"""
88 data = self._ReadTestData("bdev-net.txt")
89 result = bdev.DRBD8._GetDevInfo(data)
90 self.failUnless(("local_dev" not in result and
91 "meta_dev" not in result and
92 "meta_index" not in result),
93 "Should not find local disk info")
94 self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
95 ("192.168.1.2", 11002)),
98 def testParserDisk(self):
99 """Test drbdsetup show parser for disk and network"""
100 data = self._ReadTestData("bdev-disk.txt")
101 result = bdev.DRBD8._GetDevInfo(data)
102 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
103 "/dev/xenvg/test.meta"),
104 "Wrong local disk info")
105 self.failUnless(("local_addr" not in result and
106 "remote_addr" not in result),
107 "Should not find network info")
110 class TestDRBD8Status(testutils.GanetiTestCase):
111 """Testing case for DRBD8 /proc status"""
114 """Read in txt data"""
115 testutils.GanetiTestCase.setUp(self)
116 proc_data = self._TestDataFilename("proc_drbd8.txt")
117 proc83_data = self._TestDataFilename("proc_drbd83.txt")
118 self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
119 self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
120 self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
121 self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
123 def testIOErrors(self):
124 """Test handling of errors while reading the proc file."""
125 temp_file = self._CreateTempFile()
127 self.failUnlessRaises(errors.BlockDeviceError,
128 bdev.DRBD8._GetProcData, filename=temp_file)
130 def testMinorNotFound(self):
131 """Test not-found-minor in /proc"""
132 self.failUnless(9 not in self.mass_data)
133 self.failUnless(9 not in self.mass83_data)
135 def testLineNotMatch(self):
136 """Test wrong line passed to DRBD8Status"""
137 self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
139 def testMinor0(self):
140 """Test connected, primary device"""
141 for data in [self.mass_data, self.mass83_data]:
142 stats = bdev.DRBD8Status(data[0])
143 self.failUnless(stats.is_in_use)
144 self.failUnless(stats.is_connected and stats.is_primary and
145 stats.peer_secondary and stats.is_disk_uptodate)
147 def testMinor1(self):
148 """Test connected, secondary device"""
149 for data in [self.mass_data, self.mass83_data]:
150 stats = bdev.DRBD8Status(data[1])
151 self.failUnless(stats.is_in_use)
152 self.failUnless(stats.is_connected and stats.is_secondary and
153 stats.peer_primary and stats.is_disk_uptodate)
155 def testMinor2(self):
156 """Test unconfigured device"""
157 for data in [self.mass_data, self.mass83_data]:
158 stats = bdev.DRBD8Status(data[2])
159 self.failIf(stats.is_in_use)
161 def testMinor4(self):
162 """Test WFconn device"""
163 for data in [self.mass_data, self.mass83_data]:
164 stats = bdev.DRBD8Status(data[4])
165 self.failUnless(stats.is_in_use)
166 self.failUnless(stats.is_wfconn and stats.is_primary and
167 stats.rrole == 'Unknown' and
168 stats.is_disk_uptodate)
170 def testMinor6(self):
171 """Test diskless device"""
172 for data in [self.mass_data, self.mass83_data]:
173 stats = bdev.DRBD8Status(data[6])
174 self.failUnless(stats.is_in_use)
175 self.failUnless(stats.is_connected and stats.is_secondary and
176 stats.peer_primary and stats.is_diskless)
178 def testMinor8(self):
179 """Test standalone device"""
180 for data in [self.mass_data, self.mass83_data]:
181 stats = bdev.DRBD8Status(data[8])
182 self.failUnless(stats.is_in_use)
183 self.failUnless(stats.is_standalone and
184 stats.rrole == 'Unknown' and
185 stats.is_disk_uptodate)
187 if __name__ == '__main__':