4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the bdev module"""
29 from ganeti import bdev
30 from ganeti import errors
33 class TestDRBD8Runner(testutils.GanetiTestCase):
34 """Testing case for DRBD8"""
37 def _has_disk(data, dname, mname):
38 """Check local disk corectness"""
40 "local_dev" in data and
41 data["local_dev"] == dname and
42 "meta_dev" in data and
43 data["meta_dev"] == mname and
44 "meta_index" in data and
45 data["meta_index"] == 0
50 def _has_net(data, local, remote):
51 """Check network connection parameters"""
53 "local_addr" in data and
54 data["local_addr"] == local and
55 "remote_addr" in data and
56 data["remote_addr"] == remote
60 def testParserCreation(self):
61 """Test drbdsetup show parser creation"""
62 bdev.DRBD8._GetShowParser()
64 def testParserBoth(self):
65 """Test drbdsetup show parser for disk and network"""
66 data = self._ReadTestData("bdev-both.txt")
67 result = bdev.DRBD8._GetDevInfo(data)
68 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
69 "/dev/xenvg/test.meta"),
70 "Wrong local disk info")
71 self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
72 ("192.168.1.2", 11000)),
75 def testParserNet(self):
76 """Test drbdsetup show parser for disk and network"""
77 data = self._ReadTestData("bdev-net.txt")
78 result = bdev.DRBD8._GetDevInfo(data)
79 self.failUnless(("local_dev" not in result and
80 "meta_dev" not in result and
81 "meta_index" not in result),
82 "Should not find local disk info")
83 self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
84 ("192.168.1.2", 11002)),
87 def testParserDisk(self):
88 """Test drbdsetup show parser for disk and network"""
89 data = self._ReadTestData("bdev-disk.txt")
90 result = bdev.DRBD8._GetDevInfo(data)
91 self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
92 "/dev/xenvg/test.meta"),
93 "Wrong local disk info")
94 self.failUnless(("local_addr" not in result and
95 "remote_addr" not in result),
96 "Should not find network info")
99 class TestDRBD8Status(testutils.GanetiTestCase):
100 """Testing case for DRBD8 /proc status"""
103 """Read in txt data"""
104 proc_data = self._TestDataFilename("proc_drbd8.txt")
105 self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
106 self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
108 def testMinorNotFound(self):
109 """Test not-found-minor in /proc"""
110 self.failUnless(9 not in self.mass_data)
112 def testLineNotMatch(self):
113 """Test wrong line passed to DRBD8Status"""
114 self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
116 def testMinor0(self):
117 """Test connected, primary device"""
118 stats = bdev.DRBD8Status(self.mass_data[0])
119 self.failUnless(stats.is_connected and stats.is_primary and
120 stats.peer_secondary and stats.is_disk_uptodate)
122 def testMinor1(self):
123 """Test connected, secondary device"""
124 stats = bdev.DRBD8Status(self.mass_data[1])
125 self.failUnless(stats.is_connected and stats.is_secondary and
126 stats.peer_primary and stats.is_disk_uptodate)
128 def testMinor4(self):
129 """Test WFconn device"""
130 stats = bdev.DRBD8Status(self.mass_data[4])
131 self.failUnless(stats.is_wfconn and stats.is_primary and
132 stats.rrole == 'Unknown' and
133 stats.is_disk_uptodate)
135 def testMinor6(self):
136 """Test diskless device"""
137 stats = bdev.DRBD8Status(self.mass_data[6])
138 self.failUnless(stats.is_connected and stats.is_secondary and
139 stats.peer_primary and stats.is_diskless)
141 def testMinor8(self):
142 """Test standalone device"""
143 stats = bdev.DRBD8Status(self.mass_data[8])
144 self.failUnless(stats.is_standalone and
145 stats.rrole == 'Unknown' and
146 stats.is_disk_uptodate)
148 if __name__ == '__main__':