Watcher: fix some doc typos
[ganeti-local] / test / ganeti.bdev_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the bdev module"""
23
24
25 import os
26 import unittest
27
28 from ganeti import bdev
29 from ganeti import errors
30
31 import testutils
32
33
34 class TestDRBD8Runner(testutils.GanetiTestCase):
35   """Testing case for DRBD8"""
36
37   @staticmethod
38   def _has_disk(data, dname, mname):
39     """Check local disk corectness"""
40     retval = (
41       "local_dev" in data and
42       data["local_dev"] == dname and
43       "meta_dev" in data and
44       data["meta_dev"] == mname and
45       "meta_index" in data and
46       data["meta_index"] == 0
47       )
48     return retval
49
50   @staticmethod
51   def _has_net(data, local, remote):
52     """Check network connection parameters"""
53     retval = (
54       "local_addr" in data and
55       data["local_addr"] == local and
56       "remote_addr" in data and
57       data["remote_addr"] == remote
58       )
59     return retval
60
61   def testParserCreation(self):
62     """Test drbdsetup show parser creation"""
63     bdev.DRBD8._GetShowParser()
64
65   def testParserBoth80(self):
66     """Test drbdsetup show parser for disk and network"""
67     data = self._ReadTestData("bdev-both.txt")
68     result = bdev.DRBD8._GetDevInfo(data)
69     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
70                                    "/dev/xenvg/test.meta"),
71                     "Wrong local disk info")
72     self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
73                                   ("192.168.1.2", 11000)),
74                     "Wrong network info (8.0.x)")
75
76   def testParserBoth83(self):
77     """Test drbdsetup show parser for disk and network"""
78     data = self._ReadTestData("bdev-8.3-both.txt")
79     result = bdev.DRBD8._GetDevInfo(data)
80     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
81                                    "/dev/xenvg/test.meta"),
82                     "Wrong local disk info")
83     self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
84                                   ("192.168.1.2", 11000)),
85                     "Wrong network info (8.2.x)")
86
87   def testParserNet(self):
88     """Test drbdsetup show parser for disk and network"""
89     data = self._ReadTestData("bdev-net.txt")
90     result = bdev.DRBD8._GetDevInfo(data)
91     self.failUnless(("local_dev" not in result and
92                      "meta_dev" not in result and
93                      "meta_index" not in result),
94                     "Should not find local disk info")
95     self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
96                                   ("192.168.1.2", 11002)),
97                     "Wrong network info")
98
99   def testParserDisk(self):
100     """Test drbdsetup show parser for disk and network"""
101     data = self._ReadTestData("bdev-disk.txt")
102     result = bdev.DRBD8._GetDevInfo(data)
103     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
104                                    "/dev/xenvg/test.meta"),
105                     "Wrong local disk info")
106     self.failUnless(("local_addr" not in result and
107                      "remote_addr" not in result),
108                     "Should not find network info")
109
110
111 class TestDRBD8Status(testutils.GanetiTestCase):
112   """Testing case for DRBD8 /proc status"""
113
114   def setUp(self):
115     """Read in txt data"""
116     testutils.GanetiTestCase.setUp(self)
117     proc_data = self._TestDataFilename("proc_drbd8.txt")
118     proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
119     proc83_data = self._TestDataFilename("proc_drbd83.txt")
120     self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
121     self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
122     self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
123     self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
124     self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
125     self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
126
127   def testIOErrors(self):
128     """Test handling of errors while reading the proc file."""
129     temp_file = self._CreateTempFile()
130     os.unlink(temp_file)
131     self.failUnlessRaises(errors.BlockDeviceError,
132                           bdev.DRBD8._GetProcData, filename=temp_file)
133
134   def testMinorNotFound(self):
135     """Test not-found-minor in /proc"""
136     self.failUnless(9 not in self.mass_data)
137     self.failUnless(9 not in self.mass83_data)
138     self.failUnless(3 not in self.mass80e_data)
139
140   def testLineNotMatch(self):
141     """Test wrong line passed to DRBD8Status"""
142     self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
143
144   def testMinor0(self):
145     """Test connected, primary device"""
146     for data in [self.mass_data, self.mass83_data]:
147       stats = bdev.DRBD8Status(data[0])
148       self.failUnless(stats.is_in_use)
149       self.failUnless(stats.is_connected and stats.is_primary and
150                       stats.peer_secondary and stats.is_disk_uptodate)
151
152   def testMinor1(self):
153     """Test connected, secondary device"""
154     for data in [self.mass_data, self.mass83_data]:
155       stats = bdev.DRBD8Status(data[1])
156       self.failUnless(stats.is_in_use)
157       self.failUnless(stats.is_connected and stats.is_secondary and
158                       stats.peer_primary and stats.is_disk_uptodate)
159
160   def testMinor2(self):
161     """Test unconfigured device"""
162     for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
163       stats = bdev.DRBD8Status(data[2])
164       self.failIf(stats.is_in_use)
165
166   def testMinor4(self):
167     """Test WFconn device"""
168     for data in [self.mass_data, self.mass83_data]:
169       stats = bdev.DRBD8Status(data[4])
170       self.failUnless(stats.is_in_use)
171       self.failUnless(stats.is_wfconn and stats.is_primary and
172                       stats.rrole == 'Unknown' and
173                       stats.is_disk_uptodate)
174
175   def testMinor6(self):
176     """Test diskless device"""
177     for data in [self.mass_data, self.mass83_data]:
178       stats = bdev.DRBD8Status(data[6])
179       self.failUnless(stats.is_in_use)
180       self.failUnless(stats.is_connected and stats.is_secondary and
181                       stats.peer_primary and stats.is_diskless)
182
183   def testMinor8(self):
184     """Test standalone device"""
185     for data in [self.mass_data, self.mass83_data]:
186       stats = bdev.DRBD8Status(data[8])
187       self.failUnless(stats.is_in_use)
188       self.failUnless(stats.is_standalone and
189                       stats.rrole == 'Unknown' and
190                       stats.is_disk_uptodate)
191
192 if __name__ == '__main__':
193   testutils.GanetiTestProgram()