Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.bdev_unittest.py @ 549071a0

History | View | Annotate | Download (8.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the bdev module"""
23

    
24

    
25
import os
26
import unittest
27

    
28
from ganeti import bdev
29
from ganeti import errors
30

    
31
import testutils
32

    
33

    
34
class TestDRBD8Runner(testutils.GanetiTestCase):
35
  """Testing case for DRBD8"""
36

    
37
  @staticmethod
38
  def _has_disk(data, dname, mname):
39
    """Check local disk corectness"""
40
    retval = (
41
      "local_dev" in data and
42
      data["local_dev"] == dname and
43
      "meta_dev" in data and
44
      data["meta_dev"] == mname and
45
      "meta_index" in data and
46
      data["meta_index"] == 0
47
      )
48
    return retval
49

    
50
  @staticmethod
51
  def _has_net(data, local, remote):
52
    """Check network connection parameters"""
53
    retval = (
54
      "local_addr" in data and
55
      data["local_addr"] == local and
56
      "remote_addr" in data and
57
      data["remote_addr"] == remote
58
      )
59
    return retval
60

    
61
  def testParserCreation(self):
62
    """Test drbdsetup show parser creation"""
63
    bdev.DRBD8._GetShowParser()
64

    
65
  def testParser80(self):
66
    """Test drbdsetup show parser for disk and network version 8.0"""
67
    data = self._ReadTestData("bdev-drbd-8.0.txt")
68
    result = bdev.DRBD8._GetDevInfo(data)
69
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
70
                                   "/dev/xenvg/test.meta"),
71
                    "Wrong local disk info")
72
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
73
                                  ("192.168.1.2", 11000)),
74
                    "Wrong network info (8.0.x)")
75

    
76
  def testParser83(self):
77
    """Test drbdsetup show parser for disk and network version 8.3"""
78
    data = self._ReadTestData("bdev-drbd-8.3.txt")
79
    result = bdev.DRBD8._GetDevInfo(data)
80
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
81
                                   "/dev/xenvg/test.meta"),
82
                    "Wrong local disk info")
83
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
84
                                  ("192.168.1.2", 11000)),
85
                    "Wrong network info (8.0.x)")
86

    
87
  def testParserNetIP4(self):
88
    """Test drbdsetup show parser for IPv4 network"""
89
    data = self._ReadTestData("bdev-drbd-net-ip4.txt")
90
    result = bdev.DRBD8._GetDevInfo(data)
91
    self.failUnless(("local_dev" not in result and
92
                     "meta_dev" not in result and
93
                     "meta_index" not in result),
94
                    "Should not find local disk info")
95
    self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
96
                                  ("192.168.1.2", 11002)),
97
                    "Wrong network info (IPv4)")
98

    
99
  def testParserNetIP6(self):
100
    """Test drbdsetup show parser for IPv6 network"""
101
    data = self._ReadTestData("bdev-drbd-net-ip6.txt")
102
    result = bdev.DRBD8._GetDevInfo(data)
103
    self.failUnless(("local_dev" not in result and
104
                     "meta_dev" not in result and
105
                     "meta_index" not in result),
106
                    "Should not find local disk info")
107
    self.failUnless(self._has_net(result, ("2001:db8:65::1", 11048),
108
                                  ("2001:db8:66::1", 11048)),
109
                    "Wrong network info (IPv6)")
110

    
111
  def testParserDisk(self):
112
    """Test drbdsetup show parser for disk"""
113
    data = self._ReadTestData("bdev-drbd-disk.txt")
114
    result = bdev.DRBD8._GetDevInfo(data)
115
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
116
                                   "/dev/xenvg/test.meta"),
117
                    "Wrong local disk info")
118
    self.failUnless(("local_addr" not in result and
119
                     "remote_addr" not in result),
120
                    "Should not find network info")
121

    
122

    
123
class TestDRBD8Status(testutils.GanetiTestCase):
124
  """Testing case for DRBD8 /proc status"""
125

    
126
  def setUp(self):
127
    """Read in txt data"""
128
    testutils.GanetiTestCase.setUp(self)
129
    proc_data = self._TestDataFilename("proc_drbd8.txt")
130
    proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
131
    proc83_data = self._TestDataFilename("proc_drbd83.txt")
132
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
133
    self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
134
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
135
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
136
    self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
137
    self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
138

    
139
  def testIOErrors(self):
140
    """Test handling of errors while reading the proc file."""
141
    temp_file = self._CreateTempFile()
142
    os.unlink(temp_file)
143
    self.failUnlessRaises(errors.BlockDeviceError,
144
                          bdev.DRBD8._GetProcData, filename=temp_file)
145

    
146
  def testHelper(self):
147
    """Test reading usermode_helper in /sys."""
148
    sys_drbd_helper = self._TestDataFilename("sys_drbd_usermode_helper.txt")
149
    drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
150
    self.failUnlessEqual(drbd_helper, "/bin/true")
151

    
152
  def testHelperIOErrors(self):
153
    """Test handling of errors while reading usermode_helper in /sys."""
154
    temp_file = self._CreateTempFile()
155
    os.unlink(temp_file)
156
    self.failUnlessRaises(errors.BlockDeviceError,
157
                          bdev.DRBD8.GetUsermodeHelper, filename=temp_file)
158

    
159
  def testMinorNotFound(self):
160
    """Test not-found-minor in /proc"""
161
    self.failUnless(9 not in self.mass_data)
162
    self.failUnless(9 not in self.mass83_data)
163
    self.failUnless(3 not in self.mass80e_data)
164

    
165
  def testLineNotMatch(self):
166
    """Test wrong line passed to DRBD8Status"""
167
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
168

    
169
  def testMinor0(self):
170
    """Test connected, primary device"""
171
    for data in [self.mass_data, self.mass83_data]:
172
      stats = bdev.DRBD8Status(data[0])
173
      self.failUnless(stats.is_in_use)
174
      self.failUnless(stats.is_connected and stats.is_primary and
175
                      stats.peer_secondary and stats.is_disk_uptodate)
176

    
177
  def testMinor1(self):
178
    """Test connected, secondary device"""
179
    for data in [self.mass_data, self.mass83_data]:
180
      stats = bdev.DRBD8Status(data[1])
181
      self.failUnless(stats.is_in_use)
182
      self.failUnless(stats.is_connected and stats.is_secondary and
183
                      stats.peer_primary and stats.is_disk_uptodate)
184

    
185
  def testMinor2(self):
186
    """Test unconfigured device"""
187
    for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
188
      stats = bdev.DRBD8Status(data[2])
189
      self.failIf(stats.is_in_use)
190

    
191
  def testMinor4(self):
192
    """Test WFconn device"""
193
    for data in [self.mass_data, self.mass83_data]:
194
      stats = bdev.DRBD8Status(data[4])
195
      self.failUnless(stats.is_in_use)
196
      self.failUnless(stats.is_wfconn and stats.is_primary and
197
                      stats.rrole == 'Unknown' and
198
                      stats.is_disk_uptodate)
199

    
200
  def testMinor6(self):
201
    """Test diskless device"""
202
    for data in [self.mass_data, self.mass83_data]:
203
      stats = bdev.DRBD8Status(data[6])
204
      self.failUnless(stats.is_in_use)
205
      self.failUnless(stats.is_connected and stats.is_secondary and
206
                      stats.peer_primary and stats.is_diskless)
207

    
208
  def testMinor8(self):
209
    """Test standalone device"""
210
    for data in [self.mass_data, self.mass83_data]:
211
      stats = bdev.DRBD8Status(data[8])
212
      self.failUnless(stats.is_in_use)
213
      self.failUnless(stats.is_standalone and
214
                      stats.rrole == 'Unknown' and
215
                      stats.is_disk_uptodate)
216

    
217
if __name__ == '__main__':
218
  testutils.GanetiTestProgram()