Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.bdev_unittest.py @ 25231ec5

History | View | Annotate | Download (7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the bdev module"""
23

    
24

    
25
import os
26
import unittest
27

    
28
from ganeti import bdev
29
from ganeti import errors
30

    
31
import testutils
32

    
33

    
34
class TestDRBD8Runner(testutils.GanetiTestCase):
35
  """Testing case for DRBD8"""
36

    
37
  @staticmethod
38
  def _has_disk(data, dname, mname):
39
    """Check local disk corectness"""
40
    retval = (
41
      "local_dev" in data and
42
      data["local_dev"] == dname and
43
      "meta_dev" in data and
44
      data["meta_dev"] == mname and
45
      "meta_index" in data and
46
      data["meta_index"] == 0
47
      )
48
    return retval
49

    
50
  @staticmethod
51
  def _has_net(data, local, remote):
52
    """Check network connection parameters"""
53
    retval = (
54
      "local_addr" in data and
55
      data["local_addr"] == local and
56
      "remote_addr" in data and
57
      data["remote_addr"] == remote
58
      )
59
    return retval
60

    
61
  def testParserCreation(self):
62
    """Test drbdsetup show parser creation"""
63
    bdev.DRBD8._GetShowParser()
64

    
65
  def testParserBoth80(self):
66
    """Test drbdsetup show parser for disk and network"""
67
    data = self._ReadTestData("bdev-both.txt")
68
    result = bdev.DRBD8._GetDevInfo(data)
69
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
70
                                   "/dev/xenvg/test.meta"),
71
                    "Wrong local disk info")
72
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
73
                                  ("192.168.1.2", 11000)),
74
                    "Wrong network info (8.0.x)")
75

    
76
  def testParserBoth83(self):
77
    """Test drbdsetup show parser for disk and network"""
78
    data = self._ReadTestData("bdev-8.3-both.txt")
79
    result = bdev.DRBD8._GetDevInfo(data)
80
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
81
                                   "/dev/xenvg/test.meta"),
82
                    "Wrong local disk info")
83
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
84
                                  ("192.168.1.2", 11000)),
85
                    "Wrong network info (8.2.x)")
86

    
87
  def testParserNet(self):
88
    """Test drbdsetup show parser for disk and network"""
89
    data = self._ReadTestData("bdev-net.txt")
90
    result = bdev.DRBD8._GetDevInfo(data)
91
    self.failUnless(("local_dev" not in result and
92
                     "meta_dev" not in result and
93
                     "meta_index" not in result),
94
                    "Should not find local disk info")
95
    self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
96
                                  ("192.168.1.2", 11002)),
97
                    "Wrong network info")
98

    
99
  def testParserDisk(self):
100
    """Test drbdsetup show parser for disk and network"""
101
    data = self._ReadTestData("bdev-disk.txt")
102
    result = bdev.DRBD8._GetDevInfo(data)
103
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
104
                                   "/dev/xenvg/test.meta"),
105
                    "Wrong local disk info")
106
    self.failUnless(("local_addr" not in result and
107
                     "remote_addr" not in result),
108
                    "Should not find network info")
109

    
110

    
111
class TestDRBD8Status(testutils.GanetiTestCase):
112
  """Testing case for DRBD8 /proc status"""
113

    
114
  def setUp(self):
115
    """Read in txt data"""
116
    testutils.GanetiTestCase.setUp(self)
117
    proc_data = self._TestDataFilename("proc_drbd8.txt")
118
    proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
119
    proc83_data = self._TestDataFilename("proc_drbd83.txt")
120
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
121
    self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
122
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
123
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
124
    self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
125
    self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
126

    
127
  def testIOErrors(self):
128
    """Test handling of errors while reading the proc file."""
129
    temp_file = self._CreateTempFile()
130
    os.unlink(temp_file)
131
    self.failUnlessRaises(errors.BlockDeviceError,
132
                          bdev.DRBD8._GetProcData, filename=temp_file)
133

    
134
  def testMinorNotFound(self):
135
    """Test not-found-minor in /proc"""
136
    self.failUnless(9 not in self.mass_data)
137
    self.failUnless(9 not in self.mass83_data)
138
    self.failUnless(3 not in self.mass80e_data)
139

    
140
  def testLineNotMatch(self):
141
    """Test wrong line passed to DRBD8Status"""
142
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
143

    
144
  def testMinor0(self):
145
    """Test connected, primary device"""
146
    for data in [self.mass_data, self.mass83_data]:
147
      stats = bdev.DRBD8Status(data[0])
148
      self.failUnless(stats.is_in_use)
149
      self.failUnless(stats.is_connected and stats.is_primary and
150
                      stats.peer_secondary and stats.is_disk_uptodate)
151

    
152
  def testMinor1(self):
153
    """Test connected, secondary device"""
154
    for data in [self.mass_data, self.mass83_data]:
155
      stats = bdev.DRBD8Status(data[1])
156
      self.failUnless(stats.is_in_use)
157
      self.failUnless(stats.is_connected and stats.is_secondary and
158
                      stats.peer_primary and stats.is_disk_uptodate)
159

    
160
  def testMinor2(self):
161
    """Test unconfigured device"""
162
    for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
163
      stats = bdev.DRBD8Status(data[2])
164
      self.failIf(stats.is_in_use)
165

    
166
  def testMinor4(self):
167
    """Test WFconn device"""
168
    for data in [self.mass_data, self.mass83_data]:
169
      stats = bdev.DRBD8Status(data[4])
170
      self.failUnless(stats.is_in_use)
171
      self.failUnless(stats.is_wfconn and stats.is_primary and
172
                      stats.rrole == 'Unknown' and
173
                      stats.is_disk_uptodate)
174

    
175
  def testMinor6(self):
176
    """Test diskless device"""
177
    for data in [self.mass_data, self.mass83_data]:
178
      stats = bdev.DRBD8Status(data[6])
179
      self.failUnless(stats.is_in_use)
180
      self.failUnless(stats.is_connected and stats.is_secondary and
181
                      stats.peer_primary and stats.is_diskless)
182

    
183
  def testMinor8(self):
184
    """Test standalone device"""
185
    for data in [self.mass_data, self.mass83_data]:
186
      stats = bdev.DRBD8Status(data[8])
187
      self.failUnless(stats.is_in_use)
188
      self.failUnless(stats.is_standalone and
189
                      stats.rrole == 'Unknown' and
190
                      stats.is_disk_uptodate)
191

    
192
if __name__ == '__main__':
193
  testutils.GanetiTestProgram()