verify-disks: Explicitely state nothing has to be done
[ganeti-local] / test / ganeti.impexpd_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.impexpd"""
23
24 import os
25 import sys
26 import re
27 import unittest
28 import socket
29
30 from ganeti import constants
31 from ganeti import objects
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import impexpd
36
37 import testutils
38
39
40 class CmdBuilderConfig(objects.ConfigObject):
41   __slots__ = [
42     "bind",
43     "key",
44     "cert",
45     "ca",
46     "host",
47     "port",
48     "ipv4",
49     "ipv6",
50     "compress",
51     "magic",
52     "connect_timeout",
53     "connect_retries",
54     "cmd_prefix",
55     "cmd_suffix",
56     ]
57
58
59 def CheckCmdWord(cmd, word):
60   wre = re.compile(r"\b%s\b" % re.escape(word))
61   return compat.any(wre.search(i) for i in cmd)
62
63
64 class TestCommandBuilder(unittest.TestCase):
65   def test(self):
66     for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
67       if mode == constants.IEM_IMPORT:
68         comprcmd = "gunzip"
69       elif mode == constants.IEM_EXPORT:
70         comprcmd = "gzip"
71
72       for compress in [constants.IEC_NONE, constants.IEC_GZIP]:
73         for magic in [None, 10 * "-", "HelloWorld", "J9plh4nFo2",
74                       "24A02A81-2264-4B51-A882-A2AB9D85B420"]:
75           opts = CmdBuilderConfig(magic=magic, compress=compress)
76           builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
77
78           magic_cmd = builder._GetMagicCommand()
79           dd_cmd = builder._GetDdCommand()
80
81           if magic:
82             self.assert_(("M=%s" % magic) in magic_cmd)
83             self.assert_(("M=%s" % magic) in dd_cmd)
84           else:
85             self.assertFalse(magic_cmd)
86
87         for host in ["localhost", "198.51.100.4", "192.0.2.99"]:
88           for port in [0, 1, 1234, 7856, 45452]:
89             for cmd_prefix in [None, "PrefixCommandGoesHere|",
90                                "dd if=/dev/hda bs=1048576 |"]:
91               for cmd_suffix in [None, "< /some/file/name",
92                                  "| dd of=/dev/null"]:
93                 opts = CmdBuilderConfig(host=host, port=port, compress=compress,
94                                         cmd_prefix=cmd_prefix,
95                                         cmd_suffix=cmd_suffix)
96
97                 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
98
99                 # Check complete command
100                 cmd = builder.GetCommand()
101                 self.assert_(isinstance(cmd, list))
102
103                 if compress == constants.IEC_GZIP:
104                   self.assert_(CheckCmdWord(cmd, comprcmd))
105
106                 if cmd_prefix is not None:
107                   self.assert_(compat.any(cmd_prefix in i for i in cmd))
108
109                 if cmd_suffix is not None:
110                   self.assert_(compat.any(cmd_suffix in i for i in cmd))
111
112                 # Check socat command
113                 socat_cmd = builder._GetSocatCommand()
114
115                 if mode == constants.IEM_IMPORT:
116                   ssl_addr = socat_cmd[-2].split(",")
117                   self.assert_(("OPENSSL-LISTEN:%s" % port) in ssl_addr)
118                 elif mode == constants.IEM_EXPORT:
119                   ssl_addr = socat_cmd[-1].split(",")
120                   self.assert_(("OPENSSL:%s:%s" % (host, port)) in ssl_addr)
121
122                 self.assert_("verify=1" in ssl_addr)
123
124   def testIPv6(self):
125     for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
126       opts = CmdBuilderConfig(host="localhost", port=6789,
127                               ipv4=False, ipv6=False)
128       builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
129       cmd = builder._GetSocatCommand()
130       self.assert_(compat.all("pf=" not in i for i in cmd))
131
132       # IPv4
133       opts = CmdBuilderConfig(host="localhost", port=6789,
134                               ipv4=True, ipv6=False)
135       builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
136       cmd = builder._GetSocatCommand()
137       self.assert_(compat.any(",pf=ipv4" in i for i in cmd))
138
139       # IPv6
140       opts = CmdBuilderConfig(host="localhost", port=6789,
141                               ipv4=False, ipv6=True)
142       builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
143       cmd = builder._GetSocatCommand()
144       self.assert_(compat.any(",pf=ipv6" in i for i in cmd))
145
146       # IPv4 and IPv6
147       opts = CmdBuilderConfig(host="localhost", port=6789,
148                               ipv4=True, ipv6=True)
149       builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
150       self.assertRaises(AssertionError, builder._GetSocatCommand)
151
152   def testCommaError(self):
153     opts = CmdBuilderConfig(host="localhost", port=1234,
154                             ca="/some/path/with,a/,comma")
155
156     for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
157       builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
158       self.assertRaises(errors.GenericError, builder.GetCommand)
159
160   def testOptionLengthError(self):
161     testopts = [
162       CmdBuilderConfig(bind="0.0.0.0" + ("A" * impexpd.SOCAT_OPTION_MAXLEN),
163                        port=1234, ca="/tmp/ca"),
164       CmdBuilderConfig(host="localhost", port=1234,
165                        ca="/tmp/ca" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
166       CmdBuilderConfig(host="localhost", port=1234,
167                        key="/tmp/key" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
168       ]
169
170     for opts in testopts:
171       for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
172         builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
173         self.assertRaises(errors.GenericError, builder.GetCommand)
174
175       opts.host = "localhost" + ("A" * impexpd.SOCAT_OPTION_MAXLEN)
176       builder = impexpd.CommandBuilder(constants.IEM_EXPORT, opts, 1, 2, 3)
177       self.assertRaises(errors.GenericError, builder.GetCommand)
178
179   def testModeError(self):
180     mode = "foobarbaz"
181
182     assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT]
183
184     opts = CmdBuilderConfig(host="localhost", port=1234)
185     builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
186     self.assertRaises(errors.GenericError, builder.GetCommand)
187
188
189 class TestVerifyListening(unittest.TestCase):
190   def test(self):
191     self.assertEqual(impexpd._VerifyListening(socket.AF_INET,
192                                               "192.0.2.7", 1234),
193                      ("192.0.2.7", 1234))
194     self.assertEqual(impexpd._VerifyListening(socket.AF_INET6, "::1", 9876),
195                      ("::1", 9876))
196     self.assertEqual(impexpd._VerifyListening(socket.AF_INET6, "[::1]", 4563),
197                      ("::1", 4563))
198     self.assertEqual(impexpd._VerifyListening(socket.AF_INET6,
199                                               "[2001:db8::1:4563]", 4563),
200                      ("2001:db8::1:4563", 4563))
201
202   def testError(self):
203     for family in [socket.AF_UNIX, socket.AF_INET, socket.AF_INET6]:
204       self.assertRaises(errors.GenericError, impexpd._VerifyListening,
205                         family, "", 1234)
206       self.assertRaises(errors.GenericError, impexpd._VerifyListening,
207                         family, "192", 999)
208
209     for family in [socket.AF_UNIX, socket.AF_INET6]:
210       self.assertRaises(errors.GenericError, impexpd._VerifyListening,
211                         family, "192.0.2.7", 1234)
212       self.assertRaises(errors.GenericError, impexpd._VerifyListening,
213                         family, "[2001:db8::1", 1234)
214       self.assertRaises(errors.GenericError, impexpd._VerifyListening,
215                         family, "2001:db8::1]", 1234)
216
217     for family in [socket.AF_UNIX, socket.AF_INET]:
218       self.assertRaises(errors.GenericError, impexpd._VerifyListening,
219                         family, "::1", 1234)
220
221
222 class TestCalcThroughput(unittest.TestCase):
223   def test(self):
224     self.assertEqual(impexpd._CalcThroughput([]), None)
225     self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None)
226
227     samples = [
228       (0.0, 0.0),
229       (10.0, 100.0),
230       ]
231     self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3)
232
233     samples = [
234       (5.0, 7.0),
235       (10.0, 100.0),
236       (16.0, 181.0),
237       ]
238     self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3)
239
240
241 if __name__ == "__main__":
242   testutils.GanetiTestProgram()