4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the bdev module"""
28 from ganeti import bdev
29 from ganeti import errors
32 class TestDRBD8Runner(unittest.TestCase):
33 """Testing case for DRBD8"""
36 def _has_disk(data, dname, mname):
37 """Check local disk corectness"""
39 "local_dev" in data and
40 data["local_dev"] == dname and
41 "meta_dev" in data and
42 data["meta_dev"] == mname and
43 "meta_index" in data and
44 data["meta_index"] == 0
49 def _get_contents(name):
50 """Returns the contents of a file"""
52 prefix = os.environ.get("srcdir", None)
54 name = prefix + "/test/" + name
63 def _has_net(data, local, remote):
64 """Check network connection parameters"""
66 "local_addr" in data and
67 data["local_addr"] == local and
68 "remote_addr" in data and
69 data["remote_addr"] == remote
73 def testParserCreation(self):
74 """Test drbdsetup show parser creation"""
75 bdev.DRBD8._GetShowParser()
77 def testParserBoth(self):
78 """Test drbdsetup show parser for disk and network"""
79 data = self._get_contents("data/bdev-both.txt")
80 result = bdev.DRBD8._GetDevInfo(data)
81 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
82 "/dev/xenvg/test.meta"),
83 "Wrong local disk info")
84 self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
85 ("192.168.1.2", 11000)),
88 def testParserNet(self):
89 """Test drbdsetup show parser for disk and network"""
90 data = self._get_contents("data/bdev-net.txt")
91 result = bdev.DRBD8._GetDevInfo(data)
92 self.failUnless(("local_dev" not in result and
93 "meta_dev" not in result and
94 "meta_index" not in result),
95 "Should not find local disk info")
96 self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
97 ("192.168.1.2", 11002)),
100 def testParserDisk(self):
101 """Test drbdsetup show parser for disk and network"""
102 data = self._get_contents("data/bdev-disk.txt")
103 result = bdev.DRBD8._GetDevInfo(data)
104 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
105 "/dev/xenvg/test.meta"),
106 "Wrong local disk info")
107 self.failUnless(("local_addr" not in result and
108 "remote_addr" not in result),
109 "Should not find network info")
112 class TestDRBD8Status(unittest.TestCase):
113 """Testing case for DRBD8 /proc status"""
116 """Read in txt data"""
117 proc_data = "test/data/proc_drbd8.txt"
118 prefix = os.environ.get("srcdir", None)
120 proc_data = prefix + "/" + proc_data
121 self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
122 self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
124 def testMinorNotFound(self):
125 """Test not-found-minor in /proc"""
126 self.failUnless(9 not in self.mass_data)
128 def testLineNotMatch(self):
129 """Test wrong line passed to DRBD8Status"""
130 self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
132 def testMinor0(self):
133 """Test connected, primary device"""
134 stats = bdev.DRBD8Status(self.mass_data[0])
135 self.failUnless(stats.is_connected and stats.is_primary and
136 stats.peer_secondary and stats.is_disk_uptodate)
138 def testMinor1(self):
139 """Test connected, secondary device"""
140 stats = bdev.DRBD8Status(self.mass_data[1])
141 self.failUnless(stats.is_connected and stats.is_secondary and
142 stats.peer_primary and stats.is_disk_uptodate)
144 def testMinor4(self):
145 """Test WFconn device"""
146 stats = bdev.DRBD8Status(self.mass_data[4])
147 self.failUnless(stats.is_wfconn and stats.is_primary and
148 stats.rrole == 'Unknown' and
149 stats.is_disk_uptodate)
151 def testMinor6(self):
152 """Test diskless device"""
153 stats = bdev.DRBD8Status(self.mass_data[6])
154 self.failUnless(stats.is_connected and stats.is_secondary and
155 stats.peer_primary and stats.is_diskless)
157 def testMinor8(self):
158 """Test standalone device"""
159 stats = bdev.DRBD8Status(self.mass_data[8])
160 self.failUnless(stats.is_standalone and
161 stats.rrole == 'Unknown' and
162 stats.is_disk_uptodate)
164 if __name__ == '__main__':