Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.impexpd_unittest.py @ d01e51a5

History | View | Annotate | Download (8.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.impexpd"""
23

    
24
import os
25
import sys
26
import re
27
import unittest
28
import socket
29

    
30
from ganeti import constants
31
from ganeti import objects
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import impexpd
36

    
37
import testutils
38

    
39

    
40
class CmdBuilderConfig(objects.ConfigObject):
41
  __slots__ = [
42
    "bind",
43
    "key",
44
    "cert",
45
    "ca",
46
    "host",
47
    "port",
48
    "ipv4",
49
    "ipv6",
50
    "compress",
51
    "magic",
52
    "connect_timeout",
53
    "connect_retries",
54
    "cmd_prefix",
55
    "cmd_suffix",
56
    ]
57

    
58

    
59
def CheckCmdWord(cmd, word):
60
  wre = re.compile(r"\b%s\b" % re.escape(word))
61
  return compat.any(wre.search(i) for i in cmd)
62

    
63

    
64
class TestCommandBuilder(unittest.TestCase):
65
  def test(self):
66
    for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
67
      if mode == constants.IEM_IMPORT:
68
        comprcmd = "gunzip"
69
      elif mode == constants.IEM_EXPORT:
70
        comprcmd = "gzip"
71

    
72
      for compress in [constants.IEC_NONE, constants.IEC_GZIP]:
73
        for magic in [None, 10 * "-", "HelloWorld", "J9plh4nFo2",
74
                      "24A02A81-2264-4B51-A882-A2AB9D85B420"]:
75
          opts = CmdBuilderConfig(magic=magic, compress=compress)
76
          builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
77

    
78
          magic_cmd = builder._GetMagicCommand()
79
          dd_cmd = builder._GetDdCommand()
80

    
81
          if magic:
82
            self.assert_(("M=%s" % magic) in magic_cmd)
83
            self.assert_(("M=%s" % magic) in dd_cmd)
84
          else:
85
            self.assertFalse(magic_cmd)
86

    
87
        for host in ["localhost", "198.51.100.4", "192.0.2.99"]:
88
          for port in [0, 1, 1234, 7856, 45452]:
89
            for cmd_prefix in [None, "PrefixCommandGoesHere|",
90
                               "dd if=/dev/hda bs=1048576 |"]:
91
              for cmd_suffix in [None, "< /some/file/name",
92
                                 "| dd of=/dev/null"]:
93
                opts = CmdBuilderConfig(host=host, port=port, compress=compress,
94
                                        cmd_prefix=cmd_prefix,
95
                                        cmd_suffix=cmd_suffix)
96

    
97
                builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
98

    
99
                # Check complete command
100
                cmd = builder.GetCommand()
101
                self.assert_(isinstance(cmd, list))
102

    
103
                if compress == constants.IEC_GZIP:
104
                  self.assert_(CheckCmdWord(cmd, comprcmd))
105

    
106
                if cmd_prefix is not None:
107
                  self.assert_(compat.any(cmd_prefix in i for i in cmd))
108

    
109
                if cmd_suffix is not None:
110
                  self.assert_(compat.any(cmd_suffix in i for i in cmd))
111

    
112
                # Check socat command
113
                socat_cmd = builder._GetSocatCommand()
114

    
115
                if mode == constants.IEM_IMPORT:
116
                  ssl_addr = socat_cmd[-2].split(",")
117
                  self.assert_(("OPENSSL-LISTEN:%s" % port) in ssl_addr)
118
                elif mode == constants.IEM_EXPORT:
119
                  ssl_addr = socat_cmd[-1].split(",")
120
                  self.assert_(("OPENSSL:%s:%s" % (host, port)) in ssl_addr)
121

    
122
                self.assert_("verify=1" in ssl_addr)
123

    
124
  def testIPv6(self):
125
    for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
126
      opts = CmdBuilderConfig(host="localhost", port=6789,
127
                              ipv4=False, ipv6=False)
128
      builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
129
      cmd = builder._GetSocatCommand()
130
      self.assert_(compat.all("pf=" not in i for i in cmd))
131

    
132
      # IPv4
133
      opts = CmdBuilderConfig(host="localhost", port=6789,
134
                              ipv4=True, ipv6=False)
135
      builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
136
      cmd = builder._GetSocatCommand()
137
      self.assert_(compat.any(",pf=ipv4" in i for i in cmd))
138

    
139
      # IPv6
140
      opts = CmdBuilderConfig(host="localhost", port=6789,
141
                              ipv4=False, ipv6=True)
142
      builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
143
      cmd = builder._GetSocatCommand()
144
      self.assert_(compat.any(",pf=ipv6" in i for i in cmd))
145

    
146
      # IPv4 and IPv6
147
      opts = CmdBuilderConfig(host="localhost", port=6789,
148
                              ipv4=True, ipv6=True)
149
      builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
150
      self.assertRaises(AssertionError, builder._GetSocatCommand)
151

    
152
  def testCommaError(self):
153
    opts = CmdBuilderConfig(host="localhost", port=1234,
154
                            ca="/some/path/with,a/,comma")
155

    
156
    for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
157
      builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
158
      self.assertRaises(errors.GenericError, builder.GetCommand)
159

    
160
  def testOptionLengthError(self):
161
    testopts = [
162
      CmdBuilderConfig(bind="0.0.0.0" + ("A" * impexpd.SOCAT_OPTION_MAXLEN),
163
                       port=1234, ca="/tmp/ca"),
164
      CmdBuilderConfig(host="localhost", port=1234,
165
                       ca="/tmp/ca" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
166
      CmdBuilderConfig(host="localhost", port=1234,
167
                       key="/tmp/key" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
168
      ]
169

    
170
    for opts in testopts:
171
      for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
172
        builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
173
        self.assertRaises(errors.GenericError, builder.GetCommand)
174

    
175
      opts.host = "localhost" + ("A" * impexpd.SOCAT_OPTION_MAXLEN)
176
      builder = impexpd.CommandBuilder(constants.IEM_EXPORT, opts, 1, 2, 3)
177
      self.assertRaises(errors.GenericError, builder.GetCommand)
178

    
179
  def testModeError(self):
180
    mode = "foobarbaz"
181

    
182
    assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT]
183

    
184
    opts = CmdBuilderConfig(host="localhost", port=1234)
185
    builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
186
    self.assertRaises(errors.GenericError, builder.GetCommand)
187

    
188

    
189
class TestVerifyListening(unittest.TestCase):
190
  def test(self):
191
    self.assertEqual(impexpd._VerifyListening(socket.AF_INET,
192
                                              "192.0.2.7", 1234),
193
                     ("192.0.2.7", 1234))
194
    self.assertEqual(impexpd._VerifyListening(socket.AF_INET6, "::1", 9876),
195
                     ("::1", 9876))
196
    self.assertEqual(impexpd._VerifyListening(socket.AF_INET6, "[::1]", 4563),
197
                     ("::1", 4563))
198
    self.assertEqual(impexpd._VerifyListening(socket.AF_INET6,
199
                                              "[2001:db8::1:4563]", 4563),
200
                     ("2001:db8::1:4563", 4563))
201

    
202
  def testError(self):
203
    for family in [socket.AF_UNIX, socket.AF_INET, socket.AF_INET6]:
204
      self.assertRaises(errors.GenericError, impexpd._VerifyListening,
205
                        family, "", 1234)
206
      self.assertRaises(errors.GenericError, impexpd._VerifyListening,
207
                        family, "192", 999)
208

    
209
    for family in [socket.AF_UNIX, socket.AF_INET6]:
210
      self.assertRaises(errors.GenericError, impexpd._VerifyListening,
211
                        family, "192.0.2.7", 1234)
212
      self.assertRaises(errors.GenericError, impexpd._VerifyListening,
213
                        family, "[2001:db8::1", 1234)
214
      self.assertRaises(errors.GenericError, impexpd._VerifyListening,
215
                        family, "2001:db8::1]", 1234)
216

    
217
    for family in [socket.AF_UNIX, socket.AF_INET]:
218
      self.assertRaises(errors.GenericError, impexpd._VerifyListening,
219
                        family, "::1", 1234)
220

    
221

    
222
class TestCalcThroughput(unittest.TestCase):
223
  def test(self):
224
    self.assertEqual(impexpd._CalcThroughput([]), None)
225
    self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None)
226

    
227
    samples = [
228
      (0.0, 0.0),
229
      (10.0, 100.0),
230
      ]
231
    self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3)
232

    
233
    samples = [
234
      (5.0, 7.0),
235
      (10.0, 100.0),
236
      (16.0, 181.0),
237
      ]
238
    self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3)
239

    
240

    
241
if __name__ == "__main__":
242
  testutils.GanetiTestProgram()