Fix /proc/drbd parsing in presence of gaps
[ganeti-local] / test / ganeti.bdev_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the bdev module"""
23
24
25 import os
26 import unittest
27
28 import testutils
29 from ganeti import bdev
30 from ganeti import errors
31
32
33 class TestDRBD8Runner(testutils.GanetiTestCase):
34   """Testing case for DRBD8"""
35
36   @staticmethod
37   def _has_disk(data, dname, mname):
38     """Check local disk corectness"""
39     retval = (
40       "local_dev" in data and
41       data["local_dev"] == dname and
42       "meta_dev" in data and
43       data["meta_dev"] == mname and
44       "meta_index" in data and
45       data["meta_index"] == 0
46       )
47     return retval
48
49   @staticmethod
50   def _has_net(data, local, remote):
51     """Check network connection parameters"""
52     retval = (
53       "local_addr" in data and
54       data["local_addr"] == local and
55       "remote_addr" in data and
56       data["remote_addr"] == remote
57       )
58     return retval
59
60   def testParserCreation(self):
61     """Test drbdsetup show parser creation"""
62     bdev.DRBD8._GetShowParser()
63
64   def testParserBoth80(self):
65     """Test drbdsetup show parser for disk and network"""
66     data = self._ReadTestData("bdev-both.txt")
67     result = bdev.DRBD8._GetDevInfo(data)
68     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
69                                    "/dev/xenvg/test.meta"),
70                     "Wrong local disk info")
71     self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
72                                   ("192.168.1.2", 11000)),
73                     "Wrong network info (8.0.x)")
74
75   def testParserBoth83(self):
76     """Test drbdsetup show parser for disk and network"""
77     data = self._ReadTestData("bdev-8.3-both.txt")
78     result = bdev.DRBD8._GetDevInfo(data)
79     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
80                                    "/dev/xenvg/test.meta"),
81                     "Wrong local disk info")
82     self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
83                                   ("192.168.1.2", 11000)),
84                     "Wrong network info (8.2.x)")
85
86   def testParserNet(self):
87     """Test drbdsetup show parser for disk and network"""
88     data = self._ReadTestData("bdev-net.txt")
89     result = bdev.DRBD8._GetDevInfo(data)
90     self.failUnless(("local_dev" not in result and
91                      "meta_dev" not in result and
92                      "meta_index" not in result),
93                     "Should not find local disk info")
94     self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
95                                   ("192.168.1.2", 11002)),
96                     "Wrong network info")
97
98   def testParserDisk(self):
99     """Test drbdsetup show parser for disk and network"""
100     data = self._ReadTestData("bdev-disk.txt")
101     result = bdev.DRBD8._GetDevInfo(data)
102     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
103                                    "/dev/xenvg/test.meta"),
104                     "Wrong local disk info")
105     self.failUnless(("local_addr" not in result and
106                      "remote_addr" not in result),
107                     "Should not find network info")
108
109
110 class TestDRBD8Status(testutils.GanetiTestCase):
111   """Testing case for DRBD8 /proc status"""
112
113   def setUp(self):
114     """Read in txt data"""
115     testutils.GanetiTestCase.setUp(self)
116     proc_data = self._TestDataFilename("proc_drbd8.txt")
117     proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
118     proc83_data = self._TestDataFilename("proc_drbd83.txt")
119     self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
120     self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
121     self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
122     self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
123     self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
124     self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
125
126   def testIOErrors(self):
127     """Test handling of errors while reading the proc file."""
128     temp_file = self._CreateTempFile()
129     os.unlink(temp_file)
130     self.failUnlessRaises(errors.BlockDeviceError,
131                           bdev.DRBD8._GetProcData, filename=temp_file)
132
133   def testMinorNotFound(self):
134     """Test not-found-minor in /proc"""
135     self.failUnless(9 not in self.mass_data)
136     self.failUnless(9 not in self.mass83_data)
137     self.failUnless(3 not in self.mass80e_data)
138
139   def testLineNotMatch(self):
140     """Test wrong line passed to DRBD8Status"""
141     self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
142
143   def testMinor0(self):
144     """Test connected, primary device"""
145     for data in [self.mass_data, self.mass83_data]:
146       stats = bdev.DRBD8Status(data[0])
147       self.failUnless(stats.is_in_use)
148       self.failUnless(stats.is_connected and stats.is_primary and
149                       stats.peer_secondary and stats.is_disk_uptodate)
150
151   def testMinor1(self):
152     """Test connected, secondary device"""
153     for data in [self.mass_data, self.mass83_data]:
154       stats = bdev.DRBD8Status(data[1])
155       self.failUnless(stats.is_in_use)
156       self.failUnless(stats.is_connected and stats.is_secondary and
157                       stats.peer_primary and stats.is_disk_uptodate)
158
159   def testMinor2(self):
160     """Test unconfigured device"""
161     for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
162       stats = bdev.DRBD8Status(data[2])
163       self.failIf(stats.is_in_use)
164
165   def testMinor4(self):
166     """Test WFconn device"""
167     for data in [self.mass_data, self.mass83_data]:
168       stats = bdev.DRBD8Status(data[4])
169       self.failUnless(stats.is_in_use)
170       self.failUnless(stats.is_wfconn and stats.is_primary and
171                       stats.rrole == 'Unknown' and
172                       stats.is_disk_uptodate)
173
174   def testMinor6(self):
175     """Test diskless device"""
176     for data in [self.mass_data, self.mass83_data]:
177       stats = bdev.DRBD8Status(data[6])
178       self.failUnless(stats.is_in_use)
179       self.failUnless(stats.is_connected and stats.is_secondary and
180                       stats.peer_primary and stats.is_diskless)
181
182   def testMinor8(self):
183     """Test standalone device"""
184     for data in [self.mass_data, self.mass83_data]:
185       stats = bdev.DRBD8Status(data[8])
186       self.failUnless(stats.is_in_use)
187       self.failUnless(stats.is_standalone and
188                       stats.rrole == 'Unknown' and
189                       stats.is_disk_uptodate)
190
191 if __name__ == '__main__':
192   unittest.main()