QA: Add test for “gnt-node modify”
[ganeti-local] / test / ganeti.impexpd_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.impexpd"""
23
24 import os
25 import sys
26 import re
27 import unittest
28
29 from ganeti import constants
30 from ganeti import objects
31 from ganeti import compat
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import impexpd
35
36 import testutils
37
38
39 class CmdBuilderConfig(objects.ConfigObject):
40   __slots__ = [
41     "bind",
42     "key",
43     "cert",
44     "ca",
45     "host",
46     "port",
47     "compress",
48     "magic",
49     "connect_timeout",
50     "connect_retries",
51     "cmd_prefix",
52     "cmd_suffix",
53     ]
54
55
56 def CheckCmdWord(cmd, word):
57   wre = re.compile(r"\b%s\b" % re.escape(word))
58   return compat.any(wre.search(i) for i in cmd)
59
60
61 class TestCommandBuilder(unittest.TestCase):
62   def test(self):
63     for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
64       if mode == constants.IEM_IMPORT:
65         comprcmd = "gunzip"
66       elif mode == constants.IEM_EXPORT:
67         comprcmd = "gzip"
68
69       for compress in [constants.IEC_NONE, constants.IEC_GZIP]:
70         for magic in [None, 10 * "-", "HelloWorld", "J9plh4nFo2",
71                       "24A02A81-2264-4B51-A882-A2AB9D85B420"]:
72           opts = CmdBuilderConfig(magic=magic, compress=compress)
73           builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
74
75           magic_cmd = builder._GetMagicCommand()
76           dd_cmd = builder._GetDdCommand()
77
78           if magic:
79             self.assert_(("M=%s" % magic) in magic_cmd)
80             self.assert_(("M=%s" % magic) in dd_cmd)
81           else:
82             self.assertFalse(magic_cmd)
83
84         for host in ["localhost", "198.51.100.4", "192.0.2.99"]:
85           for port in [0, 1, 1234, 7856, 45452]:
86             for cmd_prefix in [None, "PrefixCommandGoesHere|",
87                                "dd if=/dev/hda bs=1048576 |"]:
88               for cmd_suffix in [None, "< /some/file/name",
89                                  "| dd of=/dev/null"]:
90                 opts = CmdBuilderConfig(host=host, port=port, compress=compress,
91                                         cmd_prefix=cmd_prefix,
92                                         cmd_suffix=cmd_suffix)
93
94                 builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
95
96                 # Check complete command
97                 cmd = builder.GetCommand()
98                 self.assert_(isinstance(cmd, list))
99
100                 if compress == constants.IEC_GZIP:
101                   self.assert_(CheckCmdWord(cmd, comprcmd))
102
103                 if cmd_prefix is not None:
104                   self.assert_(cmd_prefix in i for i in cmd)
105
106                 if cmd_suffix is not None:
107                   self.assert_(cmd_suffix in i for i in cmd)
108
109                 # Check socat command
110                 socat_cmd = builder._GetSocatCommand()
111
112                 if mode == constants.IEM_IMPORT:
113                   ssl_addr = socat_cmd[-2].split(",")
114                   self.assert_(("OPENSSL-LISTEN:%s" % port) in ssl_addr)
115                 elif mode == constants.IEM_EXPORT:
116                   ssl_addr = socat_cmd[-1].split(",")
117                   self.assert_(("OPENSSL:%s:%s" % (host, port)) in ssl_addr)
118
119                 self.assert_("verify=1" in ssl_addr)
120
121   def testCommaError(self):
122     opts = CmdBuilderConfig(host="localhost", port=1234,
123                             ca="/some/path/with,a/,comma")
124
125     for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
126       builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
127       self.assertRaises(errors.GenericError, builder.GetCommand)
128
129   def testOptionLengthError(self):
130     testopts = [
131       CmdBuilderConfig(bind="0.0.0.0" + ("A" * impexpd.SOCAT_OPTION_MAXLEN),
132                        port=1234, ca="/tmp/ca"),
133       CmdBuilderConfig(host="localhost", port=1234,
134                        ca="/tmp/ca" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
135       CmdBuilderConfig(host="localhost", port=1234,
136                        key="/tmp/key" + ("B" * impexpd.SOCAT_OPTION_MAXLEN)),
137       ]
138
139     for opts in testopts:
140       for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
141         builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
142         self.assertRaises(errors.GenericError, builder.GetCommand)
143
144       opts.host = "localhost" + ("A" * impexpd.SOCAT_OPTION_MAXLEN)
145       builder = impexpd.CommandBuilder(constants.IEM_EXPORT, opts, 1, 2, 3)
146       self.assertRaises(errors.GenericError, builder.GetCommand)
147
148   def testModeError(self):
149     mode = "foobarbaz"
150
151     assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT]
152
153     opts = CmdBuilderConfig(host="localhost", port=1234)
154     builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
155     self.assertRaises(errors.GenericError, builder.GetCommand)
156
157
158 class TestCalcThroughput(unittest.TestCase):
159   def test(self):
160     self.assertEqual(impexpd._CalcThroughput([]), None)
161     self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None)
162
163     samples = [
164       (0.0, 0.0),
165       (10.0, 100.0),
166       ]
167     self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3)
168
169     samples = [
170       (5.0, 7.0),
171       (10.0, 100.0),
172       (16.0, 181.0),
173       ]
174     self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3)
175
176
177 if __name__ == "__main__":
178   testutils.GanetiTestProgram()