Add the 2.0-specific node flags to the design doc
[ganeti-local] / test / ganeti.bdev_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the bdev module"""
23
24
25 import os
26 import unittest
27
28 import testutils
29 from ganeti import bdev
30 from ganeti import errors
31
32
33 class TestDRBD8Runner(testutils.GanetiTestCase):
34   """Testing case for DRBD8"""
35
36   @staticmethod
37   def _has_disk(data, dname, mname):
38     """Check local disk corectness"""
39     retval = (
40       "local_dev" in data and
41       data["local_dev"] == dname and
42       "meta_dev" in data and
43       data["meta_dev"] == mname and
44       "meta_index" in data and
45       data["meta_index"] == 0
46       )
47     return retval
48
49   @staticmethod
50   def _has_net(data, local, remote):
51     """Check network connection parameters"""
52     retval = (
53       "local_addr" in data and
54       data["local_addr"] == local and
55       "remote_addr" in data and
56       data["remote_addr"] == remote
57       )
58     return retval
59
60   def testParserCreation(self):
61     """Test drbdsetup show parser creation"""
62     bdev.DRBD8._GetShowParser()
63
64   def testParserBoth(self):
65     """Test drbdsetup show parser for disk and network"""
66     data = self._ReadTestData("bdev-both.txt")
67     result = bdev.DRBD8._GetDevInfo(data)
68     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
69                                    "/dev/xenvg/test.meta"),
70                     "Wrong local disk info")
71     self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
72                                   ("192.168.1.2", 11000)),
73                     "Wrong network info")
74
75   def testParserNet(self):
76     """Test drbdsetup show parser for disk and network"""
77     data = self._ReadTestData("bdev-net.txt")
78     result = bdev.DRBD8._GetDevInfo(data)
79     self.failUnless(("local_dev" not in result and
80                      "meta_dev" not in result and
81                      "meta_index" not in result),
82                     "Should not find local disk info")
83     self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
84                                   ("192.168.1.2", 11002)),
85                     "Wrong network info")
86
87   def testParserDisk(self):
88     """Test drbdsetup show parser for disk and network"""
89     data = self._ReadTestData("bdev-disk.txt")
90     result = bdev.DRBD8._GetDevInfo(data)
91     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
92                                    "/dev/xenvg/test.meta"),
93                     "Wrong local disk info")
94     self.failUnless(("local_addr" not in result and
95                      "remote_addr" not in result),
96                     "Should not find network info")
97
98
99 class TestDRBD8Status(testutils.GanetiTestCase):
100   """Testing case for DRBD8 /proc status"""
101
102   def setUp(self):
103     """Read in txt data"""
104     testutils.GanetiTestCase.setUp(self)
105     proc_data = self._TestDataFilename("proc_drbd8.txt")
106     self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
107     self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
108
109   def testIOErrors(self):
110     """Test handling of errors while reading the proc file."""
111     temp_file = self._CreateTempFile()
112     os.unlink(temp_file)
113     self.failUnlessRaises(errors.BlockDeviceError,
114                           bdev.DRBD8._GetProcData, filename=temp_file)
115
116   def testMinorNotFound(self):
117     """Test not-found-minor in /proc"""
118     self.failUnless(9 not in self.mass_data)
119
120   def testLineNotMatch(self):
121     """Test wrong line passed to DRBD8Status"""
122     self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
123
124   def testMinor0(self):
125     """Test connected, primary device"""
126     stats = bdev.DRBD8Status(self.mass_data[0])
127     self.failUnless(stats.is_in_use)
128     self.failUnless(stats.is_connected and stats.is_primary and
129                     stats.peer_secondary and stats.is_disk_uptodate)
130
131   def testMinor1(self):
132     """Test connected, secondary device"""
133     stats = bdev.DRBD8Status(self.mass_data[1])
134     self.failUnless(stats.is_in_use)
135     self.failUnless(stats.is_connected and stats.is_secondary and
136                     stats.peer_primary and stats.is_disk_uptodate)
137
138   def testMinor2(self):
139     """Test unconfigured device"""
140     stats = bdev.DRBD8Status(self.mass_data[2])
141     self.failIf(stats.is_in_use)
142
143   def testMinor4(self):
144     """Test WFconn device"""
145     stats = bdev.DRBD8Status(self.mass_data[4])
146     self.failUnless(stats.is_in_use)
147     self.failUnless(stats.is_wfconn and stats.is_primary and
148                     stats.rrole == 'Unknown' and
149                     stats.is_disk_uptodate)
150
151   def testMinor6(self):
152     """Test diskless device"""
153     stats = bdev.DRBD8Status(self.mass_data[6])
154     self.failUnless(stats.is_in_use)
155     self.failUnless(stats.is_connected and stats.is_secondary and
156                     stats.peer_primary and stats.is_diskless)
157
158   def testMinor8(self):
159     """Test standalone device"""
160     stats = bdev.DRBD8Status(self.mass_data[8])
161     self.failUnless(stats.is_in_use)
162     self.failUnless(stats.is_standalone and
163                     stats.rrole == 'Unknown' and
164                     stats.is_disk_uptodate)
165
166 if __name__ == '__main__':
167   unittest.main()