Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.bdev_unittest.py @ f6eaed12

History | View | Annotate | Download (5.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the bdev module"""
23

    
24

    
25
import os
26
import unittest
27

    
28
import testutils
29
from ganeti import bdev
30
from ganeti import errors
31

    
32

    
33
class TestDRBD8Runner(testutils.GanetiTestCase):
34
  """Testing case for DRBD8"""
35

    
36
  @staticmethod
37
  def _has_disk(data, dname, mname):
38
    """Check local disk corectness"""
39
    retval = (
40
      "local_dev" in data and
41
      data["local_dev"] == dname and
42
      "meta_dev" in data and
43
      data["meta_dev"] == mname and
44
      "meta_index" in data and
45
      data["meta_index"] == 0
46
      )
47
    return retval
48

    
49
  @staticmethod
50
  def _has_net(data, local, remote):
51
    """Check network connection parameters"""
52
    retval = (
53
      "local_addr" in data and
54
      data["local_addr"] == local and
55
      "remote_addr" in data and
56
      data["remote_addr"] == remote
57
      )
58
    return retval
59

    
60
  def testParserCreation(self):
61
    """Test drbdsetup show parser creation"""
62
    bdev.DRBD8._GetShowParser()
63

    
64
  def testParserBoth(self):
65
    """Test drbdsetup show parser for disk and network"""
66
    data = self._ReadTestData("bdev-both.txt")
67
    result = bdev.DRBD8._GetDevInfo(data)
68
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
69
                                   "/dev/xenvg/test.meta"),
70
                    "Wrong local disk info")
71
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
72
                                  ("192.168.1.2", 11000)),
73
                    "Wrong network info")
74

    
75
  def testParserNet(self):
76
    """Test drbdsetup show parser for disk and network"""
77
    data = self._ReadTestData("bdev-net.txt")
78
    result = bdev.DRBD8._GetDevInfo(data)
79
    self.failUnless(("local_dev" not in result and
80
                     "meta_dev" not in result and
81
                     "meta_index" not in result),
82
                    "Should not find local disk info")
83
    self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
84
                                  ("192.168.1.2", 11002)),
85
                    "Wrong network info")
86

    
87
  def testParserDisk(self):
88
    """Test drbdsetup show parser for disk and network"""
89
    data = self._ReadTestData("bdev-disk.txt")
90
    result = bdev.DRBD8._GetDevInfo(data)
91
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
92
                                   "/dev/xenvg/test.meta"),
93
                    "Wrong local disk info")
94
    self.failUnless(("local_addr" not in result and
95
                     "remote_addr" not in result),
96
                    "Should not find network info")
97

    
98

    
99
class TestDRBD8Status(testutils.GanetiTestCase):
100
  """Testing case for DRBD8 /proc status"""
101

    
102
  def setUp(self):
103
    """Read in txt data"""
104
    testutils.GanetiTestCase.setUp(self)
105
    proc_data = self._TestDataFilename("proc_drbd8.txt")
106
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
107
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
108

    
109
  def testIOErrors(self):
110
    """Test handling of errors while reading the proc file."""
111
    temp_file = self._CreateTempFile()
112
    os.unlink(temp_file)
113
    self.failUnlessRaises(errors.BlockDeviceError,
114
                          bdev.DRBD8._GetProcData, filename=temp_file)
115

    
116
  def testMinorNotFound(self):
117
    """Test not-found-minor in /proc"""
118
    self.failUnless(9 not in self.mass_data)
119

    
120
  def testLineNotMatch(self):
121
    """Test wrong line passed to DRBD8Status"""
122
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
123

    
124
  def testMinor0(self):
125
    """Test connected, primary device"""
126
    stats = bdev.DRBD8Status(self.mass_data[0])
127
    self.failUnless(stats.is_in_use)
128
    self.failUnless(stats.is_connected and stats.is_primary and
129
                    stats.peer_secondary and stats.is_disk_uptodate)
130

    
131
  def testMinor1(self):
132
    """Test connected, secondary device"""
133
    stats = bdev.DRBD8Status(self.mass_data[1])
134
    self.failUnless(stats.is_in_use)
135
    self.failUnless(stats.is_connected and stats.is_secondary and
136
                    stats.peer_primary and stats.is_disk_uptodate)
137

    
138
  def testMinor2(self):
139
    """Test unconfigured device"""
140
    stats = bdev.DRBD8Status(self.mass_data[2])
141
    self.failIf(stats.is_in_use)
142

    
143
  def testMinor4(self):
144
    """Test WFconn device"""
145
    stats = bdev.DRBD8Status(self.mass_data[4])
146
    self.failUnless(stats.is_in_use)
147
    self.failUnless(stats.is_wfconn and stats.is_primary and
148
                    stats.rrole == 'Unknown' and
149
                    stats.is_disk_uptodate)
150

    
151
  def testMinor6(self):
152
    """Test diskless device"""
153
    stats = bdev.DRBD8Status(self.mass_data[6])
154
    self.failUnless(stats.is_in_use)
155
    self.failUnless(stats.is_connected and stats.is_secondary and
156
                    stats.peer_primary and stats.is_diskless)
157

    
158
  def testMinor8(self):
159
    """Test standalone device"""
160
    stats = bdev.DRBD8Status(self.mass_data[8])
161
    self.failUnless(stats.is_in_use)
162
    self.failUnless(stats.is_standalone and
163
                    stats.rrole == 'Unknown' and
164
                    stats.is_disk_uptodate)
165

    
166
if __name__ == '__main__':
167
  unittest.main()