rapi.client, http.client: Format url correctly when using IPv6
[ganeti-local] / test / ganeti.netutils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the netutils module"""
23
24 import os
25 import shutil
26 import socket
27 import tempfile
28 import unittest
29
30 import testutils
31 from ganeti import constants
32 from ganeti import errors
33 from ganeti import netutils
34 from ganeti import serializer
35 from ganeti import utils
36
37
38 def _GetSocketCredentials(path):
39   """Connect to a Unix socket and return remote credentials.
40
41   """
42   sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
43   try:
44     sock.settimeout(10)
45     sock.connect(path)
46     return netutils.GetSocketCredentials(sock)
47   finally:
48     sock.close()
49
50
51 class TestGetSocketCredentials(unittest.TestCase):
52   def setUp(self):
53     self.tmpdir = tempfile.mkdtemp()
54     self.sockpath = utils.PathJoin(self.tmpdir, "sock")
55
56     self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
57     self.listener.settimeout(10)
58     self.listener.bind(self.sockpath)
59     self.listener.listen(1)
60
61   def tearDown(self):
62     self.listener.shutdown(socket.SHUT_RDWR)
63     self.listener.close()
64     shutil.rmtree(self.tmpdir)
65
66   def test(self):
67     (c2pr, c2pw) = os.pipe()
68
69     # Start child process
70     child = os.fork()
71     if child == 0:
72       try:
73         data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
74
75         os.write(c2pw, data)
76         os.close(c2pw)
77
78         os._exit(0)
79       finally:
80         os._exit(1)
81
82     os.close(c2pw)
83
84     # Wait for one connection
85     (conn, _) = self.listener.accept()
86     conn.recv(1)
87     conn.close()
88
89     # Wait for result
90     result = os.read(c2pr, 4096)
91     os.close(c2pr)
92
93     # Check child's exit code
94     (_, status) = os.waitpid(child, 0)
95     self.assertFalse(os.WIFSIGNALED(status))
96     self.assertEqual(os.WEXITSTATUS(status), 0)
97
98     # Check result
99     (pid, uid, gid) = serializer.LoadJson(result)
100     self.assertEqual(pid, os.getpid())
101     self.assertEqual(uid, os.getuid())
102     self.assertEqual(gid, os.getgid())
103
104
105 class TestHostname(unittest.TestCase):
106   """Testing case for Hostname"""
107
108   def testUppercase(self):
109     data = "AbC.example.com"
110     self.assertEqual(netutils.Hostname.GetNormalizedName(data), data.lower())
111
112   def testTooLongName(self):
113     data = "a.b." + "c" * 255
114     self.assertRaises(errors.OpPrereqError,
115                       netutils.Hostname.GetNormalizedName, data)
116
117   def testTrailingDot(self):
118     data = "a.b.c"
119     self.assertEqual(netutils.Hostname.GetNormalizedName(data + "."), data)
120
121   def testInvalidName(self):
122     data = [
123       "a b",
124       "a/b",
125       ".a.b",
126       "a..b",
127       ]
128     for value in data:
129       self.assertRaises(errors.OpPrereqError,
130                         netutils.Hostname.GetNormalizedName, value)
131
132   def testValidName(self):
133     data = [
134       "a.b",
135       "a-b",
136       "a_b",
137       "a.b.c",
138       ]
139     for value in data:
140       self.assertEqual(netutils.Hostname.GetNormalizedName(value), value)
141
142
143 class TestIPAddress(unittest.TestCase):
144   def testIsValid(self):
145     self.assert_(netutils.IPAddress.IsValid("0.0.0.0"))
146     self.assert_(netutils.IPAddress.IsValid("127.0.0.1"))
147     self.assert_(netutils.IPAddress.IsValid("::"))
148     self.assert_(netutils.IPAddress.IsValid("::1"))
149
150   def testNotIsValid(self):
151     self.assertFalse(netutils.IPAddress.IsValid("0"))
152     self.assertFalse(netutils.IPAddress.IsValid("1.1.1.256"))
153     self.assertFalse(netutils.IPAddress.IsValid("a:g::1"))
154
155
156   def testGetAddressFamily(self):
157     fn = netutils.IPAddress.GetAddressFamily
158     self.assertEqual(fn("127.0.0.1"), socket.AF_INET)
159     self.assertEqual(fn("10.2.0.127"), socket.AF_INET)
160     self.assertEqual(fn("::1"), socket.AF_INET6)
161     self.assertEqual(fn("2001:db8::1"), socket.AF_INET6)
162     self.assertRaises(errors.IPAddressError, fn, "0")
163
164   def testOwnLoopback(self):
165     # FIXME: In a pure IPv6 environment this is no longer true
166     self.assert_(netutils.IPAddress.Own("127.0.0.1"),
167                  "Should own 127.0.0.1 address")
168
169   def testNotOwnAddress(self):
170     self.assertFalse(netutils.IPAddress.Own("2001:db8::1"),
171                      "Should not own IP address 2001:db8::1")
172     self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
173                      "Should not own IP address 192.0.2.1")
174
175
176 class TestIP4Address(unittest.TestCase):
177   def testGetIPIntFromString(self):
178     fn = netutils.IP4Address._GetIPIntFromString
179     self.assertEqual(fn("0.0.0.0"), 0)
180     self.assertEqual(fn("0.0.0.1"), 1)
181     self.assertEqual(fn("127.0.0.1"), 2130706433)
182     self.assertEqual(fn("192.0.2.129"), 3221226113)
183     self.assertEqual(fn("255.255.255.255"), 2**32 - 1)
184     self.assertNotEqual(fn("0.0.0.0"), 1)
185     self.assertNotEqual(fn("0.0.0.0"), 1)
186
187   def testIsValid(self):
188     self.assert_(netutils.IP4Address.IsValid("0.0.0.0"))
189     self.assert_(netutils.IP4Address.IsValid("127.0.0.1"))
190     self.assert_(netutils.IP4Address.IsValid("192.0.2.199"))
191     self.assert_(netutils.IP4Address.IsValid("255.255.255.255"))
192
193   def testNotIsValid(self):
194     self.assertFalse(netutils.IP4Address.IsValid("0"))
195     self.assertFalse(netutils.IP4Address.IsValid("1"))
196     self.assertFalse(netutils.IP4Address.IsValid("1.1.1"))
197     self.assertFalse(netutils.IP4Address.IsValid("255.255.255.256"))
198     self.assertFalse(netutils.IP4Address.IsValid("::1"))
199
200   def testInNetwork(self):
201     self.assert_(netutils.IP4Address.InNetwork("127.0.0.0/8", "127.0.0.1"))
202
203   def testNotInNetwork(self):
204     self.assertFalse(netutils.IP4Address.InNetwork("192.0.2.0/24",
205                                                    "127.0.0.1"))
206
207   def testIsLoopback(self):
208     self.assert_(netutils.IP4Address.IsLoopback("127.0.0.1"))
209
210   def testNotIsLoopback(self):
211     self.assertFalse(netutils.IP4Address.IsLoopback("192.0.2.1"))
212
213
214 class TestIP6Address(unittest.TestCase):
215   def testGetIPIntFromString(self):
216     fn = netutils.IP6Address._GetIPIntFromString
217     self.assertEqual(fn("::"), 0)
218     self.assertEqual(fn("::1"), 1)
219     self.assertEqual(fn("2001:db8::1"),
220                      42540766411282592856903984951653826561L)
221     self.assertEqual(fn("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), 2**128-1)
222     self.assertNotEqual(netutils.IP6Address._GetIPIntFromString("::2"), 1)
223
224   def testIsValid(self):
225     self.assert_(netutils.IP6Address.IsValid("::"))
226     self.assert_(netutils.IP6Address.IsValid("::1"))
227     self.assert_(netutils.IP6Address.IsValid("1" + (":1" * 7)))
228     self.assert_(netutils.IP6Address.IsValid("ffff" + (":ffff" * 7)))
229     self.assert_(netutils.IP6Address.IsValid("::"))
230
231   def testNotIsValid(self):
232     self.assertFalse(netutils.IP6Address.IsValid("0"))
233     self.assertFalse(netutils.IP6Address.IsValid(":1"))
234     self.assertFalse(netutils.IP6Address.IsValid("f" + (":f" * 6)))
235     self.assertFalse(netutils.IP6Address.IsValid("fffg" + (":ffff" * 7)))
236     self.assertFalse(netutils.IP6Address.IsValid("fffff" + (":ffff" * 7)))
237     self.assertFalse(netutils.IP6Address.IsValid("1" + (":1" * 8)))
238     self.assertFalse(netutils.IP6Address.IsValid("127.0.0.1"))
239
240   def testInNetwork(self):
241     self.assert_(netutils.IP6Address.InNetwork("::1/128", "::1"))
242
243   def testNotInNetwork(self):
244     self.assertFalse(netutils.IP6Address.InNetwork("2001:db8::1/128", "::1"))
245
246   def testIsLoopback(self):
247     self.assert_(netutils.IP6Address.IsLoopback("::1"))
248
249   def testNotIsLoopback(self):
250     self.assertFalse(netutils.IP6Address.IsLoopback("2001:db8::1"))
251
252
253 class _BaseTcpPingTest:
254   """Base class for TcpPing tests against listen(2)ing port"""
255   family = None
256   address = None
257
258   def setUp(self):
259     self.listener = socket.socket(self.family, socket.SOCK_STREAM)
260     self.listener.bind((self.address, 0))
261     self.listenerport = self.listener.getsockname()[1]
262     self.listener.listen(1)
263
264   def tearDown(self):
265     self.listener.shutdown(socket.SHUT_RDWR)
266     del self.listener
267     del self.listenerport
268
269   def testTcpPingToLocalHostAccept(self):
270     self.assert_(netutils.TcpPing(self.address,
271                                   self.listenerport,
272                                   timeout=constants.TCP_PING_TIMEOUT,
273                                   live_port_needed=True,
274                                   source=self.address,
275                                   ),
276                  "failed to connect to test listener")
277
278     self.assert_(netutils.TcpPing(self.address, self.listenerport,
279                                   timeout=constants.TCP_PING_TIMEOUT,
280                                   live_port_needed=True),
281                  "failed to connect to test listener (no source)")
282
283
284 class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
285   """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
286   family = socket.AF_INET
287   address = constants.IP4_ADDRESS_LOCALHOST
288
289   def setUp(self):
290     unittest.TestCase.setUp(self)
291     _BaseTcpPingTest.setUp(self)
292
293   def tearDown(self):
294     unittest.TestCase.tearDown(self)
295     _BaseTcpPingTest.tearDown(self)
296
297
298 class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
299   """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
300   family = socket.AF_INET6
301   address = constants.IP6_ADDRESS_LOCALHOST
302
303   def setUp(self):
304     unittest.TestCase.setUp(self)
305     _BaseTcpPingTest.setUp(self)
306
307   def tearDown(self):
308     unittest.TestCase.tearDown(self)
309     _BaseTcpPingTest.tearDown(self)
310
311
312 class _BaseTcpPingDeafTest:
313   """Base class for TcpPing tests against non listen(2)ing port"""
314   family = None
315   address = None
316
317   def setUp(self):
318     self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
319     self.deaflistener.bind((self.address, 0))
320     self.deaflistenerport = self.deaflistener.getsockname()[1]
321
322   def tearDown(self):
323     del self.deaflistener
324     del self.deaflistenerport
325
326   def testTcpPingToLocalHostAcceptDeaf(self):
327     self.assertFalse(netutils.TcpPing(self.address,
328                                       self.deaflistenerport,
329                                       timeout=constants.TCP_PING_TIMEOUT,
330                                       live_port_needed=True,
331                                       source=self.address,
332                                       ), # need successful connect(2)
333                      "successfully connected to deaf listener")
334
335     self.assertFalse(netutils.TcpPing(self.address,
336                                       self.deaflistenerport,
337                                       timeout=constants.TCP_PING_TIMEOUT,
338                                       live_port_needed=True,
339                                       ), # need successful connect(2)
340                      "successfully connected to deaf listener (no source)")
341
342   def testTcpPingToLocalHostNoAccept(self):
343     self.assert_(netutils.TcpPing(self.address,
344                                   self.deaflistenerport,
345                                   timeout=constants.TCP_PING_TIMEOUT,
346                                   live_port_needed=False,
347                                   source=self.address,
348                                   ), # ECONNREFUSED is OK
349                  "failed to ping alive host on deaf port")
350
351     self.assert_(netutils.TcpPing(self.address,
352                                   self.deaflistenerport,
353                                   timeout=constants.TCP_PING_TIMEOUT,
354                                   live_port_needed=False,
355                                   ), # ECONNREFUSED is OK
356                  "failed to ping alive host on deaf port (no source)")
357
358
359 class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
360   """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
361   family = socket.AF_INET
362   address = constants.IP4_ADDRESS_LOCALHOST
363
364   def setUp(self):
365     self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
366     self.deaflistener.bind((self.address, 0))
367     self.deaflistenerport = self.deaflistener.getsockname()[1]
368
369   def tearDown(self):
370     del self.deaflistener
371     del self.deaflistenerport
372
373
374 class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
375   """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
376   family = socket.AF_INET6
377   address = constants.IP6_ADDRESS_LOCALHOST
378
379   def setUp(self):
380     unittest.TestCase.setUp(self)
381     _BaseTcpPingDeafTest.setUp(self)
382
383   def tearDown(self):
384     unittest.TestCase.tearDown(self)
385     _BaseTcpPingDeafTest.tearDown(self)
386
387
388 class TestFormatAddress(unittest.TestCase):
389   """Testcase for FormatAddress"""
390
391   def testFormatAddressUnixSocket(self):
392     res1 = netutils.FormatAddress(socket.AF_UNIX, ("12352", 0, 0))
393     self.assertEqual(res1, "pid=12352, uid=0, gid=0")
394
395   def testFormatAddressIP4(self):
396     res1 = netutils.FormatAddress(socket.AF_INET, ("127.0.0.1", 1234))
397     self.assertEqual(res1, "127.0.0.1:1234")
398     res2 = netutils.FormatAddress(socket.AF_INET, ("192.0.2.32", None))
399     self.assertEqual(res2, "192.0.2.32")
400
401   def testFormatAddressIP6(self):
402     res1 = netutils.FormatAddress(socket.AF_INET6, ("::1", 1234))
403     self.assertEqual(res1, "[::1]:1234")
404     res2 = netutils.FormatAddress(socket.AF_INET6, ("::1", None))
405     self.assertEqual(res2, "[::1]")
406     res2 = netutils.FormatAddress(socket.AF_INET6, ("2001:db8::beef", "80"))
407     self.assertEqual(res2, "[2001:db8::beef]:80")
408
409   def testInvalidFormatAddress(self):
410     self.assertRaises(errors.ParameterError,
411                       netutils.FormatAddress, None, ("::1", None))
412     self.assertRaises(errors.ParameterError,
413                       netutils.FormatAddress, socket.AF_INET, "127.0.0.1")
414     self.assertRaises(errors.ParameterError,
415                       netutils.FormatAddress, socket.AF_INET, ("::1"))
416
417
418 if __name__ == "__main__":
419   testutils.GanetiTestProgram()