Get rid of httperror module
[ganeti-local] / test / ganeti.bdev_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the bdev module"""
23
24
25 import os
26 import unittest
27
28 from ganeti import bdev
29 from ganeti import errors
30
31
32 class TestDRBD8Runner(unittest.TestCase):
33   """Testing case for DRBD8"""
34
35   @staticmethod
36   def _has_disk(data, dname, mname):
37     """Check local disk corectness"""
38     retval = (
39       "local_dev" in data and
40       data["local_dev"] == dname and
41       "meta_dev" in data and
42       data["meta_dev"] == mname and
43       "meta_index" in data and
44       data["meta_index"] == 0
45       )
46     return retval
47
48   @staticmethod
49   def _get_contents(name):
50     """Returns the contents of a file"""
51
52     prefix = os.environ.get("srcdir", None)
53     if prefix:
54       name = prefix + "/test/" + name
55     fh = open(name, "r")
56     try:
57       data = fh.read()
58     finally:
59       fh.close()
60     return data
61
62   @staticmethod
63   def _has_net(data, local, remote):
64     """Check network connection parameters"""
65     retval = (
66       "local_addr" in data and
67       data["local_addr"] == local and
68       "remote_addr" in data and
69       data["remote_addr"] == remote
70       )
71     return retval
72
73   def testParserCreation(self):
74     """Test drbdsetup show parser creation"""
75     bdev.DRBD8._GetShowParser()
76
77   def testParserBoth(self):
78     """Test drbdsetup show parser for disk and network"""
79     data = self._get_contents("data/bdev-both.txt")
80     result = bdev.DRBD8._GetDevInfo(data)
81     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
82                                    "/dev/xenvg/test.meta"),
83                     "Wrong local disk info")
84     self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
85                                   ("192.168.1.2", 11000)),
86                     "Wrong network info")
87
88   def testParserNet(self):
89     """Test drbdsetup show parser for disk and network"""
90     data = self._get_contents("data/bdev-net.txt")
91     result = bdev.DRBD8._GetDevInfo(data)
92     self.failUnless(("local_dev" not in result and
93                      "meta_dev" not in result and
94                      "meta_index" not in result),
95                     "Should not find local disk info")
96     self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
97                                   ("192.168.1.2", 11002)),
98                     "Wrong network info")
99
100   def testParserDisk(self):
101     """Test drbdsetup show parser for disk and network"""
102     data = self._get_contents("data/bdev-disk.txt")
103     result = bdev.DRBD8._GetDevInfo(data)
104     self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
105                                    "/dev/xenvg/test.meta"),
106                     "Wrong local disk info")
107     self.failUnless(("local_addr" not in result and
108                      "remote_addr" not in result),
109                     "Should not find network info")
110
111
112 class TestDRBD8Status(unittest.TestCase):
113   """Testing case for DRBD8 /proc status"""
114
115   def setUp(self):
116     """Read in txt data"""
117     proc_data = "test/data/proc_drbd8.txt"
118     prefix = os.environ.get("srcdir", None)
119     if prefix:
120       proc_data = prefix + "/" + proc_data
121     self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
122     self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
123
124   def testMinorNotFound(self):
125     """Test not-found-minor in /proc"""
126     self.failUnless(9 not in self.mass_data)
127
128   def testLineNotMatch(self):
129     """Test wrong line passed to DRBD8Status"""
130     self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
131
132   def testMinor0(self):
133     """Test connected, primary device"""
134     stats = bdev.DRBD8Status(self.mass_data[0])
135     self.failUnless(stats.is_connected and stats.is_primary and
136                     stats.peer_secondary and stats.is_disk_uptodate)
137
138   def testMinor1(self):
139     """Test connected, secondary device"""
140     stats = bdev.DRBD8Status(self.mass_data[1])
141     self.failUnless(stats.is_connected and stats.is_secondary and
142                     stats.peer_primary and stats.is_disk_uptodate)
143
144   def testMinor4(self):
145     """Test WFconn device"""
146     stats = bdev.DRBD8Status(self.mass_data[4])
147     self.failUnless(stats.is_wfconn and stats.is_primary and
148                     stats.rrole == 'Unknown' and
149                     stats.is_disk_uptodate)
150
151   def testMinor6(self):
152     """Test diskless device"""
153     stats = bdev.DRBD8Status(self.mass_data[6])
154     self.failUnless(stats.is_connected and stats.is_secondary and
155                     stats.peer_primary and stats.is_diskless)
156
157   def testMinor8(self):
158     """Test standalone device"""
159     stats = bdev.DRBD8Status(self.mass_data[8])
160     self.failUnless(stats.is_standalone and
161                     stats.rrole == 'Unknown' and
162                     stats.is_disk_uptodate)
163
164 if __name__ == '__main__':
165   unittest.main()