Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.impexpd_unittest.py @ 1d3dfa29

History | View | Annotate | Download (5.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.impexpd"""
23

    
24
import os
25
import sys
26
import re
27
import unittest
28

    
29
from ganeti import constants
30
from ganeti import objects
31
from ganeti import compat
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import impexpd
35

    
36
import testutils
37

    
38

    
39
class CmdBuilderConfig(objects.ConfigObject):
40
  __slots__ = [
41
    "bind",
42
    "key",
43
    "cert",
44
    "ca",
45
    "host",
46
    "port",
47
    "compress",
48
    "magic",
49
    "connect_timeout",
50
    "connect_retries",
51
    "cmd_prefix",
52
    "cmd_suffix",
53
    ]
54

    
55

    
56
def CheckCmdWord(cmd, word):
57
  wre = re.compile(r"\b%s\b" % re.escape(word))
58
  return compat.any(wre.search(i) for i in cmd)
59

    
60

    
61
class TestCommandBuilder(unittest.TestCase):
62
  def test(self):
63
    for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
64
      if mode == constants.IEM_IMPORT:
65
        comprcmd = "gunzip"
66
      elif mode == constants.IEM_EXPORT:
67
        comprcmd = "gzip"
68

    
69
      for compress in [constants.IEC_NONE, constants.IEC_GZIP]:
70
        for magic in [None, 10 * "-", "HelloWorld", "J9plh4nFo2",
71
                      "24A02A81-2264-4B51-A882-A2AB9D85B420"]:
72
          opts = CmdBuilderConfig(magic=magic, compress=compress)
73
          builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
74

    
75
          magic_cmd = builder._GetMagicCommand()
76
          dd_cmd = builder._GetDdCommand()
77

    
78
          if magic:
79
            self.assert_(("M=%s" % magic) in magic_cmd)
80
            self.assert_(("M=%s" % magic) in dd_cmd)
81
          else:
82
            self.assertFalse(magic_cmd)
83

    
84
        for host in ["localhost", "1.2.3.4", "192.0.2.99"]:
85
          for port in [0, 1, 1234, 7856, 45452]:
86
            for cmd_prefix in [None, "PrefixCommandGoesHere|",
87
                               "dd if=/dev/hda bs=1048576 |"]:
88
              for cmd_suffix in [None, "< /some/file/name",
89
                                 "| dd of=/dev/null"]:
90
                opts = CmdBuilderConfig(host=host, port=port, compress=compress,
91
                                        cmd_prefix=cmd_prefix,
92
                                        cmd_suffix=cmd_suffix)
93

    
94
                builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
95

    
96
                # Check complete command
97
                cmd = builder.GetCommand()
98
                self.assert_(isinstance(cmd, list))
99

    
100
                if compress == constants.IEC_GZIP:
101
                  self.assert_(CheckCmdWord(cmd, comprcmd))
102

    
103
                if cmd_prefix is not None:
104
                  self.assert_(cmd_prefix in i for i in cmd)
105

    
106
                if cmd_suffix is not None:
107
                  self.assert_(cmd_suffix in i for i in cmd)
108

    
109
                # Check socat command
110
                socat_cmd = builder._GetSocatCommand()
111

    
112
                if mode == constants.IEM_IMPORT:
113
                  ssl_addr = socat_cmd[-2].split(",")
114
                  self.assert_(("OPENSSL-LISTEN:%s" % port) in ssl_addr)
115
                elif mode == constants.IEM_EXPORT:
116
                  ssl_addr = socat_cmd[-1].split(",")
117
                  self.assert_(("OPENSSL:%s:%s" % (host, port)) in ssl_addr)
118

    
119
                self.assert_("verify=1" in ssl_addr)
120

    
121
  def testCommaError(self):
122
    opts = CmdBuilderConfig(host="localhost", port=1234,
123
                            ca="/some/path/with,a/,comma")
124

    
125
    for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
126
      builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
127
      self.assertRaises(errors.GenericError, builder.GetCommand)
128

    
129
  def testOptionLengthError(self):
130
    testopts = [
131
      CmdBuilderConfig(bind="0.0.0.0" + ("A" * impexpd.SOCAT_OPTION_MAXLEN),
132
                       port=1234, ca="/tmp/ca"),
133
      CmdBuilderConfig(host="localhost", port=1234,
134
                       ca="/tmp/ca" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
135
      CmdBuilderConfig(host="localhost", port=1234,
136
                       key="/tmp/key" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
137
      ]
138

    
139
    for opts in testopts:
140
      for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
141
        builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
142
        self.assertRaises(errors.GenericError, builder.GetCommand)
143

    
144
      opts.host = "localhost" + ("A" * impexpd.SOCAT_OPTION_MAXLEN)
145
      builder = impexpd.CommandBuilder(constants.IEM_EXPORT, opts, 1, 2, 3)
146
      self.assertRaises(errors.GenericError, builder.GetCommand)
147

    
148
  def testModeError(self):
149
    mode = "foobarbaz"
150

    
151
    assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT]
152

    
153
    opts = CmdBuilderConfig(host="localhost", port=1234)
154
    builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
155
    self.assertRaises(errors.GenericError, builder.GetCommand)
156

    
157

    
158
class TestCalcThroughput(unittest.TestCase):
159
  def test(self):
160
    self.assertEqual(impexpd._CalcThroughput([]), None)
161
    self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None)
162

    
163
    samples = [
164
      (0.0, 0.0),
165
      (10.0, 100.0),
166
      ]
167
    self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3)
168

    
169
    samples = [
170
      (5.0, 7.0),
171
      (10.0, 100.0),
172
      (16.0, 181.0),
173
      ]
174
    self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3)
175

    
176

    
177
if __name__ == "__main__":
178
  testutils.GanetiTestProgram()