Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.impexpd_unittest.py @ 0559f745

History | View | Annotate | Download (5.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.impexpd"""
23

    
24
import os
25
import sys
26
import re
27
import unittest
28

    
29
from ganeti import constants
30
from ganeti import objects
31
from ganeti import compat
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import impexpd
35

    
36
import testutils
37

    
38

    
39
class CmdBuilderConfig(objects.ConfigObject):
40
  __slots__ = [
41
    "bind",
42
    "key",
43
    "cert",
44
    "ca",
45
    "host",
46
    "port",
47
    "compress",
48
    "connect_timeout",
49
    "connect_retries",
50
    "cmd_prefix",
51
    "cmd_suffix",
52
    ]
53

    
54

    
55
def CheckCmdWord(cmd, word):
56
  wre = re.compile(r"\b%s\b" % re.escape(word))
57
  return compat.any(wre.search(i) for i in cmd)
58

    
59

    
60
class TestCommandBuilder(unittest.TestCase):
61
  def test(self):
62
    for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
63
      if mode == constants.IEM_IMPORT:
64
        comprcmd = "gunzip"
65
      elif mode == constants.IEM_EXPORT:
66
        comprcmd = "gzip"
67

    
68
      for compress in [constants.IEC_NONE, constants.IEC_GZIP]:
69
        for host in ["localhost", "1.2.3.4", "192.0.2.99"]:
70
          for port in [0, 1, 1234, 7856, 45452]:
71
            for cmd_prefix in [None, "PrefixCommandGoesHere|",
72
                               "dd if=/dev/hda bs=1048576 |"]:
73
              for cmd_suffix in [None, "< /some/file/name",
74
                                 "| dd of=/dev/null"]:
75
                opts = CmdBuilderConfig(host=host, port=port, compress=compress,
76
                                        cmd_prefix=cmd_prefix,
77
                                        cmd_suffix=cmd_suffix)
78

    
79
                builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
80

    
81
                # Check complete command
82
                cmd = builder.GetCommand()
83
                self.assert_(isinstance(cmd, list))
84

    
85
                if compress == constants.IEC_GZIP:
86
                  self.assert_(CheckCmdWord(cmd, comprcmd))
87

    
88
                if cmd_prefix is not None:
89
                  self.assert_(cmd_prefix in i for i in cmd)
90

    
91
                if cmd_suffix is not None:
92
                  self.assert_(cmd_suffix in i for i in cmd)
93

    
94
                # Check socat command
95
                socat_cmd = builder._GetSocatCommand()
96

    
97
                if mode == constants.IEM_IMPORT:
98
                  ssl_addr = socat_cmd[-2].split(",")
99
                  self.assert_(("OPENSSL-LISTEN:%s" % port) in ssl_addr)
100
                elif mode == constants.IEM_EXPORT:
101
                  ssl_addr = socat_cmd[-1].split(",")
102
                  self.assert_(("OPENSSL:%s:%s" % (host, port)) in ssl_addr)
103

    
104
                self.assert_("verify=1" in ssl_addr)
105

    
106
  def testCommaError(self):
107
    opts = CmdBuilderConfig(host="localhost", port=1234,
108
                            ca="/some/path/with,a/,comma")
109

    
110
    for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
111
      builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
112
      self.assertRaises(errors.GenericError, builder.GetCommand)
113

    
114
  def testOptionLengthError(self):
115
    testopts = [
116
      CmdBuilderConfig(bind="0.0.0.0" + ("A" * impexpd.SOCAT_OPTION_MAXLEN),
117
                       port=1234, ca="/tmp/ca"),
118
      CmdBuilderConfig(host="localhost", port=1234,
119
                       ca="/tmp/ca" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
120
      CmdBuilderConfig(host="localhost", port=1234,
121
                       key="/tmp/key" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
122
      ]
123

    
124
    for opts in testopts:
125
      for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
126
        builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
127
        self.assertRaises(errors.GenericError, builder.GetCommand)
128

    
129
      opts.host = "localhost" + ("A" * impexpd.SOCAT_OPTION_MAXLEN)
130
      builder = impexpd.CommandBuilder(constants.IEM_EXPORT, opts, 1, 2, 3)
131
      self.assertRaises(errors.GenericError, builder.GetCommand)
132

    
133
  def testModeError(self):
134
    mode = "foobarbaz"
135

    
136
    assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT]
137

    
138
    opts = CmdBuilderConfig(host="localhost", port=1234)
139
    builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
140
    self.assertRaises(errors.GenericError, builder.GetCommand)
141

    
142

    
143
class TestCalcThroughput(unittest.TestCase):
144
  def test(self):
145
    self.assertEqual(impexpd._CalcThroughput([]), None)
146
    self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None)
147

    
148
    samples = [
149
      (0.0, 0.0),
150
      (10.0, 100.0),
151
      ]
152
    self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3)
153

    
154
    samples = [
155
      (5.0, 7.0),
156
      (10.0, 100.0),
157
      (16.0, 181.0),
158
      ]
159
    self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3)
160

    
161

    
162
if __name__ == "__main__":
163
  testutils.GanetiTestProgram()