4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.impexpd"""
30 from ganeti import constants
31 from ganeti import objects
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import impexpd
40 class CmdBuilderConfig(objects.ConfigObject):
59 def CheckCmdWord(cmd, word):
60 wre = re.compile(r"\b%s\b" % re.escape(word))
61 return compat.any(wre.search(i) for i in cmd)
64 class TestCommandBuilder(unittest.TestCase):
66 for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
67 if mode == constants.IEM_IMPORT:
69 elif mode == constants.IEM_EXPORT:
72 for compress in [constants.IEC_NONE, constants.IEC_GZIP]:
73 for magic in [None, 10 * "-", "HelloWorld", "J9plh4nFo2",
74 "24A02A81-2264-4B51-A882-A2AB9D85B420"]:
75 opts = CmdBuilderConfig(magic=magic, compress=compress)
76 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
78 magic_cmd = builder._GetMagicCommand()
79 dd_cmd = builder._GetDdCommand()
82 self.assert_(("M=%s" % magic) in magic_cmd)
83 self.assert_(("M=%s" % magic) in dd_cmd)
85 self.assertFalse(magic_cmd)
87 for host in ["localhost", "198.51.100.4", "192.0.2.99"]:
88 for port in [0, 1, 1234, 7856, 45452]:
89 for cmd_prefix in [None, "PrefixCommandGoesHere|",
90 "dd if=/dev/hda bs=1048576 |"]:
91 for cmd_suffix in [None, "< /some/file/name",
93 opts = CmdBuilderConfig(host=host, port=port, compress=compress,
94 cmd_prefix=cmd_prefix,
95 cmd_suffix=cmd_suffix)
97 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
99 # Check complete command
100 cmd = builder.GetCommand()
101 self.assert_(isinstance(cmd, list))
103 if compress == constants.IEC_GZIP:
104 self.assert_(CheckCmdWord(cmd, comprcmd))
106 if cmd_prefix is not None:
107 self.assert_(compat.any(cmd_prefix in i for i in cmd))
109 if cmd_suffix is not None:
110 self.assert_(compat.any(cmd_suffix in i for i in cmd))
112 # Check socat command
113 socat_cmd = builder._GetSocatCommand()
115 if mode == constants.IEM_IMPORT:
116 ssl_addr = socat_cmd[-2].split(",")
117 self.assert_(("OPENSSL-LISTEN:%s" % port) in ssl_addr)
118 elif mode == constants.IEM_EXPORT:
119 ssl_addr = socat_cmd[-1].split(",")
120 self.assert_(("OPENSSL:%s:%s" % (host, port)) in ssl_addr)
122 self.assert_("verify=1" in ssl_addr)
125 for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
126 opts = CmdBuilderConfig(host="localhost", port=6789,
127 ipv4=False, ipv6=False)
128 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
129 cmd = builder._GetSocatCommand()
130 self.assert_(compat.all("pf=" not in i for i in cmd))
133 opts = CmdBuilderConfig(host="localhost", port=6789,
134 ipv4=True, ipv6=False)
135 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
136 cmd = builder._GetSocatCommand()
137 self.assert_(compat.any(",pf=ipv4" in i for i in cmd))
140 opts = CmdBuilderConfig(host="localhost", port=6789,
141 ipv4=False, ipv6=True)
142 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
143 cmd = builder._GetSocatCommand()
144 self.assert_(compat.any(",pf=ipv6" in i for i in cmd))
147 opts = CmdBuilderConfig(host="localhost", port=6789,
148 ipv4=True, ipv6=True)
149 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
150 self.assertRaises(AssertionError, builder._GetSocatCommand)
152 def testCommaError(self):
153 opts = CmdBuilderConfig(host="localhost", port=1234,
154 ca="/some/path/with,a/,comma")
156 for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
157 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
158 self.assertRaises(errors.GenericError, builder.GetCommand)
160 def testOptionLengthError(self):
162 CmdBuilderConfig(bind="0.0.0.0" + ("A" * impexpd.SOCAT_OPTION_MAXLEN),
163 port=1234, ca="/tmp/ca"),
164 CmdBuilderConfig(host="localhost", port=1234,
165 ca="/tmp/ca" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
166 CmdBuilderConfig(host="localhost", port=1234,
167 key="/tmp/key" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
170 for opts in testopts:
171 for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
172 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
173 self.assertRaises(errors.GenericError, builder.GetCommand)
175 opts.host = "localhost" + ("A" * impexpd.SOCAT_OPTION_MAXLEN)
176 builder = impexpd.CommandBuilder(constants.IEM_EXPORT, opts, 1, 2, 3)
177 self.assertRaises(errors.GenericError, builder.GetCommand)
179 def testModeError(self):
182 assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT]
184 opts = CmdBuilderConfig(host="localhost", port=1234)
185 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
186 self.assertRaises(errors.GenericError, builder.GetCommand)
189 class TestVerifyListening(unittest.TestCase):
191 self.assertEqual(impexpd._VerifyListening(socket.AF_INET,
194 self.assertEqual(impexpd._VerifyListening(socket.AF_INET6, "::1", 9876),
196 self.assertEqual(impexpd._VerifyListening(socket.AF_INET6, "[::1]", 4563),
198 self.assertEqual(impexpd._VerifyListening(socket.AF_INET6,
199 "[2001:db8::1:4563]", 4563),
200 ("2001:db8::1:4563", 4563))
203 for family in [socket.AF_UNIX, socket.AF_INET, socket.AF_INET6]:
204 self.assertRaises(errors.GenericError, impexpd._VerifyListening,
206 self.assertRaises(errors.GenericError, impexpd._VerifyListening,
209 for family in [socket.AF_UNIX, socket.AF_INET6]:
210 self.assertRaises(errors.GenericError, impexpd._VerifyListening,
211 family, "192.0.2.7", 1234)
212 self.assertRaises(errors.GenericError, impexpd._VerifyListening,
213 family, "[2001:db8::1", 1234)
214 self.assertRaises(errors.GenericError, impexpd._VerifyListening,
215 family, "2001:db8::1]", 1234)
217 for family in [socket.AF_UNIX, socket.AF_INET]:
218 self.assertRaises(errors.GenericError, impexpd._VerifyListening,
222 class TestCalcThroughput(unittest.TestCase):
224 self.assertEqual(impexpd._CalcThroughput([]), None)
225 self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None)
231 self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3)
238 self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3)
241 if __name__ == "__main__":
242 testutils.GanetiTestProgram()