Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.bdev_unittest.py @ 6760e4ed

History | View | Annotate | Download (8.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the bdev module"""
23

    
24

    
25
import os
26
import unittest
27

    
28
from ganeti import bdev
29
from ganeti import errors
30

    
31
import testutils
32

    
33

    
34
class TestBaseDRBD(testutils.GanetiTestCase):
35
  def testGetVersion(self):
36
    data = [
37
      ["version: 8.0.12 (api:76/proto:86-91)"],
38
      ["version: 8.2.7 (api:88/proto:0-100)"],
39
      ["version: 8.3.7.49 (api:188/proto:13-191)"],
40
    ]
41
    result = [
42
      {
43
      "k_major": 8,
44
      "k_minor": 0,
45
      "k_point": 12,
46
      "api": 76,
47
      "proto": 86,
48
      "proto2": "91",
49
      },
50
      {
51
      "k_major": 8,
52
      "k_minor": 2,
53
      "k_point": 7,
54
      "api": 88,
55
      "proto": 0,
56
      "proto2": "100",
57
      },
58
      {
59
      "k_major": 8,
60
      "k_minor": 3,
61
      "k_point": 7,
62
      "api": 188,
63
      "proto": 13,
64
      "proto2": "191",
65
      }
66
    ]
67
    for d,r in zip(data, result):
68
      self.assertEqual(bdev.BaseDRBD._GetVersion(d), r)
69

    
70

    
71
class TestDRBD8Runner(testutils.GanetiTestCase):
72
  """Testing case for DRBD8"""
73

    
74
  @staticmethod
75
  def _has_disk(data, dname, mname):
76
    """Check local disk corectness"""
77
    retval = (
78
      "local_dev" in data and
79
      data["local_dev"] == dname and
80
      "meta_dev" in data and
81
      data["meta_dev"] == mname and
82
      "meta_index" in data and
83
      data["meta_index"] == 0
84
      )
85
    return retval
86

    
87
  @staticmethod
88
  def _has_net(data, local, remote):
89
    """Check network connection parameters"""
90
    retval = (
91
      "local_addr" in data and
92
      data["local_addr"] == local and
93
      "remote_addr" in data and
94
      data["remote_addr"] == remote
95
      )
96
    return retval
97

    
98
  def testParserCreation(self):
99
    """Test drbdsetup show parser creation"""
100
    bdev.DRBD8._GetShowParser()
101

    
102
  def testParser80(self):
103
    """Test drbdsetup show parser for disk and network version 8.0"""
104
    data = self._ReadTestData("bdev-drbd-8.0.txt")
105
    result = bdev.DRBD8._GetDevInfo(data)
106
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
107
                                   "/dev/xenvg/test.meta"),
108
                    "Wrong local disk info")
109
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
110
                                  ("192.0.2.2", 11000)),
111
                    "Wrong network info (8.0.x)")
112

    
113
  def testParser83(self):
114
    """Test drbdsetup show parser for disk and network version 8.3"""
115
    data = self._ReadTestData("bdev-drbd-8.3.txt")
116
    result = bdev.DRBD8._GetDevInfo(data)
117
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
118
                                   "/dev/xenvg/test.meta"),
119
                    "Wrong local disk info")
120
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
121
                                  ("192.0.2.2", 11000)),
122
                    "Wrong network info (8.0.x)")
123

    
124
  def testParserNetIP4(self):
125
    """Test drbdsetup show parser for IPv4 network"""
126
    data = self._ReadTestData("bdev-drbd-net-ip4.txt")
127
    result = bdev.DRBD8._GetDevInfo(data)
128
    self.failUnless(("local_dev" not in result and
129
                     "meta_dev" not in result and
130
                     "meta_index" not in result),
131
                    "Should not find local disk info")
132
    self.failUnless(self._has_net(result, ("192.0.2.1", 11002),
133
                                  ("192.0.2.2", 11002)),
134
                    "Wrong network info (IPv4)")
135

    
136
  def testParserNetIP6(self):
137
    """Test drbdsetup show parser for IPv6 network"""
138
    data = self._ReadTestData("bdev-drbd-net-ip6.txt")
139
    result = bdev.DRBD8._GetDevInfo(data)
140
    self.failUnless(("local_dev" not in result and
141
                     "meta_dev" not in result and
142
                     "meta_index" not in result),
143
                    "Should not find local disk info")
144
    self.failUnless(self._has_net(result, ("2001:db8:65::1", 11048),
145
                                  ("2001:db8:66::1", 11048)),
146
                    "Wrong network info (IPv6)")
147

    
148
  def testParserDisk(self):
149
    """Test drbdsetup show parser for disk"""
150
    data = self._ReadTestData("bdev-drbd-disk.txt")
151
    result = bdev.DRBD8._GetDevInfo(data)
152
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
153
                                   "/dev/xenvg/test.meta"),
154
                    "Wrong local disk info")
155
    self.failUnless(("local_addr" not in result and
156
                     "remote_addr" not in result),
157
                    "Should not find network info")
158

    
159

    
160
class TestDRBD8Status(testutils.GanetiTestCase):
161
  """Testing case for DRBD8 /proc status"""
162

    
163
  def setUp(self):
164
    """Read in txt data"""
165
    testutils.GanetiTestCase.setUp(self)
166
    proc_data = self._TestDataFilename("proc_drbd8.txt")
167
    proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
168
    proc83_data = self._TestDataFilename("proc_drbd83.txt")
169
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
170
    self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
171
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
172
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
173
    self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
174
    self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
175

    
176
  def testIOErrors(self):
177
    """Test handling of errors while reading the proc file."""
178
    temp_file = self._CreateTempFile()
179
    os.unlink(temp_file)
180
    self.failUnlessRaises(errors.BlockDeviceError,
181
                          bdev.DRBD8._GetProcData, filename=temp_file)
182

    
183
  def testHelper(self):
184
    """Test reading usermode_helper in /sys."""
185
    sys_drbd_helper = self._TestDataFilename("sys_drbd_usermode_helper.txt")
186
    drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
187
    self.failUnlessEqual(drbd_helper, "/bin/true")
188

    
189
  def testHelperIOErrors(self):
190
    """Test handling of errors while reading usermode_helper in /sys."""
191
    temp_file = self._CreateTempFile()
192
    os.unlink(temp_file)
193
    self.failUnlessRaises(errors.BlockDeviceError,
194
                          bdev.DRBD8.GetUsermodeHelper, filename=temp_file)
195

    
196
  def testMinorNotFound(self):
197
    """Test not-found-minor in /proc"""
198
    self.failUnless(9 not in self.mass_data)
199
    self.failUnless(9 not in self.mass83_data)
200
    self.failUnless(3 not in self.mass80e_data)
201

    
202
  def testLineNotMatch(self):
203
    """Test wrong line passed to DRBD8Status"""
204
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
205

    
206
  def testMinor0(self):
207
    """Test connected, primary device"""
208
    for data in [self.mass_data, self.mass83_data]:
209
      stats = bdev.DRBD8Status(data[0])
210
      self.failUnless(stats.is_in_use)
211
      self.failUnless(stats.is_connected and stats.is_primary and
212
                      stats.peer_secondary and stats.is_disk_uptodate)
213

    
214
  def testMinor1(self):
215
    """Test connected, secondary device"""
216
    for data in [self.mass_data, self.mass83_data]:
217
      stats = bdev.DRBD8Status(data[1])
218
      self.failUnless(stats.is_in_use)
219
      self.failUnless(stats.is_connected and stats.is_secondary and
220
                      stats.peer_primary and stats.is_disk_uptodate)
221

    
222
  def testMinor2(self):
223
    """Test unconfigured device"""
224
    for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
225
      stats = bdev.DRBD8Status(data[2])
226
      self.failIf(stats.is_in_use)
227

    
228
  def testMinor4(self):
229
    """Test WFconn device"""
230
    for data in [self.mass_data, self.mass83_data]:
231
      stats = bdev.DRBD8Status(data[4])
232
      self.failUnless(stats.is_in_use)
233
      self.failUnless(stats.is_wfconn and stats.is_primary and
234
                      stats.rrole == 'Unknown' and
235
                      stats.is_disk_uptodate)
236

    
237
  def testMinor6(self):
238
    """Test diskless device"""
239
    for data in [self.mass_data, self.mass83_data]:
240
      stats = bdev.DRBD8Status(data[6])
241
      self.failUnless(stats.is_in_use)
242
      self.failUnless(stats.is_connected and stats.is_secondary and
243
                      stats.peer_primary and stats.is_diskless)
244

    
245
  def testMinor8(self):
246
    """Test standalone device"""
247
    for data in [self.mass_data, self.mass83_data]:
248
      stats = bdev.DRBD8Status(data[8])
249
      self.failUnless(stats.is_in_use)
250
      self.failUnless(stats.is_standalone and
251
                      stats.rrole == 'Unknown' and
252
                      stats.is_disk_uptodate)
253

    
254
if __name__ == '__main__':
255
  testutils.GanetiTestProgram()