4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the bdev module"""
29 from ganeti import bdev
30 from ganeti import errors
33 class TestDRBD8Runner(testutils.GanetiTestCase):
34 """Testing case for DRBD8"""
37 def _has_disk(data, dname, mname):
38 """Check local disk corectness"""
40 "local_dev" in data and
41 data["local_dev"] == dname and
42 "meta_dev" in data and
43 data["meta_dev"] == mname and
44 "meta_index" in data and
45 data["meta_index"] == 0
50 def _has_net(data, local, remote):
51 """Check network connection parameters"""
53 "local_addr" in data and
54 data["local_addr"] == local and
55 "remote_addr" in data and
56 data["remote_addr"] == remote
60 def testParserCreation(self):
61 """Test drbdsetup show parser creation"""
62 bdev.DRBD8._GetShowParser()
64 def testParserBoth(self):
65 """Test drbdsetup show parser for disk and network"""
66 data = self._ReadTestData("bdev-both.txt")
67 result = bdev.DRBD8._GetDevInfo(data)
68 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
69 "/dev/xenvg/test.meta"),
70 "Wrong local disk info")
71 self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
72 ("192.168.1.2", 11000)),
75 def testParserNet(self):
76 """Test drbdsetup show parser for disk and network"""
77 data = self._ReadTestData("bdev-net.txt")
78 result = bdev.DRBD8._GetDevInfo(data)
79 self.failUnless(("local_dev" not in result and
80 "meta_dev" not in result and
81 "meta_index" not in result),
82 "Should not find local disk info")
83 self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
84 ("192.168.1.2", 11002)),
87 def testParserDisk(self):
88 """Test drbdsetup show parser for disk and network"""
89 data = self._ReadTestData("bdev-disk.txt")
90 result = bdev.DRBD8._GetDevInfo(data)
91 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
92 "/dev/xenvg/test.meta"),
93 "Wrong local disk info")
94 self.failUnless(("local_addr" not in result and
95 "remote_addr" not in result),
96 "Should not find network info")
99 class TestDRBD8Status(testutils.GanetiTestCase):
100 """Testing case for DRBD8 /proc status"""
103 """Read in txt data"""
104 testutils.GanetiTestCase.setUp(self)
105 proc_data = self._TestDataFilename("proc_drbd8.txt")
106 self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
107 self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
109 def testIOErrors(self):
110 """Test handling of errors while reading the proc file."""
111 temp_file = self._CreateTempFile()
113 self.failUnlessRaises(errors.BlockDeviceError,
114 bdev.DRBD8._GetProcData, filename=temp_file)
116 def testMinorNotFound(self):
117 """Test not-found-minor in /proc"""
118 self.failUnless(9 not in self.mass_data)
120 def testLineNotMatch(self):
121 """Test wrong line passed to DRBD8Status"""
122 self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
124 def testMinor0(self):
125 """Test connected, primary device"""
126 stats = bdev.DRBD8Status(self.mass_data[0])
127 self.failUnless(stats.is_in_use)
128 self.failUnless(stats.is_connected and stats.is_primary and
129 stats.peer_secondary and stats.is_disk_uptodate)
131 def testMinor1(self):
132 """Test connected, secondary device"""
133 stats = bdev.DRBD8Status(self.mass_data[1])
134 self.failUnless(stats.is_in_use)
135 self.failUnless(stats.is_connected and stats.is_secondary and
136 stats.peer_primary and stats.is_disk_uptodate)
138 def testMinor2(self):
139 """Test unconfigured device"""
140 stats = bdev.DRBD8Status(self.mass_data[2])
141 self.failIf(stats.is_in_use)
143 def testMinor4(self):
144 """Test WFconn device"""
145 stats = bdev.DRBD8Status(self.mass_data[4])
146 self.failUnless(stats.is_in_use)
147 self.failUnless(stats.is_wfconn and stats.is_primary and
148 stats.rrole == 'Unknown' and
149 stats.is_disk_uptodate)
151 def testMinor6(self):
152 """Test diskless device"""
153 stats = bdev.DRBD8Status(self.mass_data[6])
154 self.failUnless(stats.is_in_use)
155 self.failUnless(stats.is_connected and stats.is_secondary and
156 stats.peer_primary and stats.is_diskless)
158 def testMinor8(self):
159 """Test standalone device"""
160 stats = bdev.DRBD8Status(self.mass_data[8])
161 self.failUnless(stats.is_in_use)
162 self.failUnless(stats.is_standalone and
163 stats.rrole == 'Unknown' and
164 stats.is_disk_uptodate)
166 if __name__ == '__main__':