Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.bdev_unittest.py @ d357f531

History | View | Annotate | Download (7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the bdev module"""
23

    
24

    
25
import os
26
import unittest
27

    
28
import testutils
29
from ganeti import bdev
30
from ganeti import errors
31

    
32

    
33
class TestDRBD8Runner(testutils.GanetiTestCase):
34
  """Testing case for DRBD8"""
35

    
36
  @staticmethod
37
  def _has_disk(data, dname, mname):
38
    """Check local disk corectness"""
39
    retval = (
40
      "local_dev" in data and
41
      data["local_dev"] == dname and
42
      "meta_dev" in data and
43
      data["meta_dev"] == mname and
44
      "meta_index" in data and
45
      data["meta_index"] == 0
46
      )
47
    return retval
48

    
49
  @staticmethod
50
  def _has_net(data, local, remote):
51
    """Check network connection parameters"""
52
    retval = (
53
      "local_addr" in data and
54
      data["local_addr"] == local and
55
      "remote_addr" in data and
56
      data["remote_addr"] == remote
57
      )
58
    return retval
59

    
60
  def testParserCreation(self):
61
    """Test drbdsetup show parser creation"""
62
    bdev.DRBD8._GetShowParser()
63

    
64
  def testParserBoth80(self):
65
    """Test drbdsetup show parser for disk and network"""
66
    data = self._ReadTestData("bdev-both.txt")
67
    result = bdev.DRBD8._GetDevInfo(data)
68
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
69
                                   "/dev/xenvg/test.meta"),
70
                    "Wrong local disk info")
71
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
72
                                  ("192.168.1.2", 11000)),
73
                    "Wrong network info (8.0.x)")
74

    
75
  def testParserBoth83(self):
76
    """Test drbdsetup show parser for disk and network"""
77
    data = self._ReadTestData("bdev-8.3-both.txt")
78
    result = bdev.DRBD8._GetDevInfo(data)
79
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
80
                                   "/dev/xenvg/test.meta"),
81
                    "Wrong local disk info")
82
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
83
                                  ("192.168.1.2", 11000)),
84
                    "Wrong network info (8.2.x)")
85

    
86
  def testParserNet(self):
87
    """Test drbdsetup show parser for disk and network"""
88
    data = self._ReadTestData("bdev-net.txt")
89
    result = bdev.DRBD8._GetDevInfo(data)
90
    self.failUnless(("local_dev" not in result and
91
                     "meta_dev" not in result and
92
                     "meta_index" not in result),
93
                    "Should not find local disk info")
94
    self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
95
                                  ("192.168.1.2", 11002)),
96
                    "Wrong network info")
97

    
98
  def testParserDisk(self):
99
    """Test drbdsetup show parser for disk and network"""
100
    data = self._ReadTestData("bdev-disk.txt")
101
    result = bdev.DRBD8._GetDevInfo(data)
102
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
103
                                   "/dev/xenvg/test.meta"),
104
                    "Wrong local disk info")
105
    self.failUnless(("local_addr" not in result and
106
                     "remote_addr" not in result),
107
                    "Should not find network info")
108

    
109

    
110
class TestDRBD8Status(testutils.GanetiTestCase):
111
  """Testing case for DRBD8 /proc status"""
112

    
113
  def setUp(self):
114
    """Read in txt data"""
115
    testutils.GanetiTestCase.setUp(self)
116
    proc_data = self._TestDataFilename("proc_drbd8.txt")
117
    proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
118
    proc83_data = self._TestDataFilename("proc_drbd83.txt")
119
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
120
    self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
121
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
122
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
123
    self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
124
    self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
125

    
126
  def testIOErrors(self):
127
    """Test handling of errors while reading the proc file."""
128
    temp_file = self._CreateTempFile()
129
    os.unlink(temp_file)
130
    self.failUnlessRaises(errors.BlockDeviceError,
131
                          bdev.DRBD8._GetProcData, filename=temp_file)
132

    
133
  def testMinorNotFound(self):
134
    """Test not-found-minor in /proc"""
135
    self.failUnless(9 not in self.mass_data)
136
    self.failUnless(9 not in self.mass83_data)
137
    self.failUnless(3 not in self.mass80e_data)
138

    
139
  def testLineNotMatch(self):
140
    """Test wrong line passed to DRBD8Status"""
141
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
142

    
143
  def testMinor0(self):
144
    """Test connected, primary device"""
145
    for data in [self.mass_data, self.mass83_data]:
146
      stats = bdev.DRBD8Status(data[0])
147
      self.failUnless(stats.is_in_use)
148
      self.failUnless(stats.is_connected and stats.is_primary and
149
                      stats.peer_secondary and stats.is_disk_uptodate)
150

    
151
  def testMinor1(self):
152
    """Test connected, secondary device"""
153
    for data in [self.mass_data, self.mass83_data]:
154
      stats = bdev.DRBD8Status(data[1])
155
      self.failUnless(stats.is_in_use)
156
      self.failUnless(stats.is_connected and stats.is_secondary and
157
                      stats.peer_primary and stats.is_disk_uptodate)
158

    
159
  def testMinor2(self):
160
    """Test unconfigured device"""
161
    for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
162
      stats = bdev.DRBD8Status(data[2])
163
      self.failIf(stats.is_in_use)
164

    
165
  def testMinor4(self):
166
    """Test WFconn device"""
167
    for data in [self.mass_data, self.mass83_data]:
168
      stats = bdev.DRBD8Status(data[4])
169
      self.failUnless(stats.is_in_use)
170
      self.failUnless(stats.is_wfconn and stats.is_primary and
171
                      stats.rrole == 'Unknown' and
172
                      stats.is_disk_uptodate)
173

    
174
  def testMinor6(self):
175
    """Test diskless device"""
176
    for data in [self.mass_data, self.mass83_data]:
177
      stats = bdev.DRBD8Status(data[6])
178
      self.failUnless(stats.is_in_use)
179
      self.failUnless(stats.is_connected and stats.is_secondary and
180
                      stats.peer_primary and stats.is_diskless)
181

    
182
  def testMinor8(self):
183
    """Test standalone device"""
184
    for data in [self.mass_data, self.mass83_data]:
185
      stats = bdev.DRBD8Status(data[8])
186
      self.failUnless(stats.is_in_use)
187
      self.failUnless(stats.is_standalone and
188
                      stats.rrole == 'Unknown' and
189
                      stats.is_disk_uptodate)
190

    
191
if __name__ == '__main__':
192
  unittest.main()