Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.bdev_unittest.py @ fa10bdc5

History | View | Annotate | Download (5.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the bdev module"""
23

    
24

    
25
import os
26
import unittest
27

    
28
from ganeti import bdev
29
from ganeti import errors
30

    
31

    
32
class TestDRBD8Runner(unittest.TestCase):
33
  """Testing case for DRBD8"""
34

    
35
  @staticmethod
36
  def _has_disk(data, dname, mname):
37
    """Check local disk corectness"""
38
    retval = (
39
      "local_dev" in data and
40
      data["local_dev"] == dname and
41
      "meta_dev" in data and
42
      data["meta_dev"] == mname and
43
      "meta_index" in data and
44
      data["meta_index"] == 0
45
      )
46
    return retval
47

    
48
  @staticmethod
49
  def _get_contents(name):
50
    """Returns the contents of a file"""
51

    
52
    prefix = os.environ.get("srcdir", None)
53
    if prefix:
54
      name = prefix + "/test/" + name
55
    fh = open(name, "r")
56
    try:
57
      data = fh.read()
58
    finally:
59
      fh.close()
60
    return data
61

    
62
  @staticmethod
63
  def _has_net(data, local, remote):
64
    """Check network connection parameters"""
65
    retval = (
66
      "local_addr" in data and
67
      data["local_addr"] == local and
68
      "remote_addr" in data and
69
      data["remote_addr"] == remote
70
      )
71
    return retval
72

    
73
  def testParserCreation(self):
74
    """Test drbdsetup show parser creation"""
75
    bdev.DRBD8._GetShowParser()
76

    
77
  def testParserBoth(self):
78
    """Test drbdsetup show parser for disk and network"""
79
    data = self._get_contents("data/bdev-both.txt")
80
    result = bdev.DRBD8._GetDevInfo(data)
81
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
82
                                   "/dev/xenvg/test.meta"),
83
                    "Wrong local disk info")
84
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
85
                                  ("192.168.1.2", 11000)),
86
                    "Wrong network info")
87

    
88
  def testParserNet(self):
89
    """Test drbdsetup show parser for disk and network"""
90
    data = self._get_contents("data/bdev-net.txt")
91
    result = bdev.DRBD8._GetDevInfo(data)
92
    self.failUnless(("local_dev" not in result and
93
                     "meta_dev" not in result and
94
                     "meta_index" not in result),
95
                    "Should not find local disk info")
96
    self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
97
                                  ("192.168.1.2", 11002)),
98
                    "Wrong network info")
99

    
100
  def testParserDisk(self):
101
    """Test drbdsetup show parser for disk and network"""
102
    data = self._get_contents("data/bdev-disk.txt")
103
    result = bdev.DRBD8._GetDevInfo(data)
104
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
105
                                   "/dev/xenvg/test.meta"),
106
                    "Wrong local disk info")
107
    self.failUnless(("local_addr" not in result and
108
                     "remote_addr" not in result),
109
                    "Should not find network info")
110

    
111

    
112
class TestDRBD8Status(unittest.TestCase):
113
  """Testing case for DRBD8 /proc status"""
114

    
115
  def setUp(self):
116
    """Read in txt data"""
117
    proc_data = "test/data/proc_drbd8.txt"
118
    prefix = os.environ.get("srcdir", None)
119
    if prefix:
120
      proc_data = prefix + "/" + proc_data
121
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
122
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
123

    
124
  def testMinorNotFound(self):
125
    """Test not-found-minor in /proc"""
126
    self.failUnless(9 not in self.mass_data)
127

    
128
  def testLineNotMatch(self):
129
    """Test wrong line passed to DRBD8Status"""
130
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
131

    
132
  def testMinor0(self):
133
    """Test connected, primary device"""
134
    stats = bdev.DRBD8Status(self.mass_data[0])
135
    self.failUnless(stats.is_connected and stats.is_primary and
136
                    stats.peer_secondary and stats.is_disk_uptodate)
137

    
138
  def testMinor1(self):
139
    """Test connected, secondary device"""
140
    stats = bdev.DRBD8Status(self.mass_data[1])
141
    self.failUnless(stats.is_connected and stats.is_secondary and
142
                    stats.peer_primary and stats.is_disk_uptodate)
143

    
144
  def testMinor4(self):
145
    """Test WFconn device"""
146
    stats = bdev.DRBD8Status(self.mass_data[4])
147
    self.failUnless(stats.is_wfconn and stats.is_primary and
148
                    stats.rrole == 'Unknown' and
149
                    stats.is_disk_uptodate)
150

    
151
  def testMinor6(self):
152
    """Test diskless device"""
153
    stats = bdev.DRBD8Status(self.mass_data[6])
154
    self.failUnless(stats.is_connected and stats.is_secondary and
155
                    stats.peer_primary and stats.is_diskless)
156

    
157
  def testMinor8(self):
158
    """Test standalone device"""
159
    stats = bdev.DRBD8Status(self.mass_data[8])
160
    self.failUnless(stats.is_standalone and
161
                    stats.rrole == 'Unknown' and
162
                    stats.is_disk_uptodate)
163

    
164
if __name__ == '__main__':
165
  unittest.main()