QA: Add tests for instance start/stop via RAPI
[ganeti-local] / test / ganeti.netutils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the netutils module"""
23
24 import os
25 import re
26 import shutil
27 import socket
28 import tempfile
29 import unittest
30
31 import testutils
32 from ganeti import constants
33 from ganeti import errors
34 from ganeti import netutils
35 from ganeti import serializer
36 from ganeti import utils
37
38
39 def _GetSocketCredentials(path):
40   """Connect to a Unix socket and return remote credentials.
41
42   """
43   sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
44   try:
45     sock.settimeout(10)
46     sock.connect(path)
47     return netutils.GetSocketCredentials(sock)
48   finally:
49     sock.close()
50
51
52 class TestGetSocketCredentials(unittest.TestCase):
53   def setUp(self):
54     self.tmpdir = tempfile.mkdtemp()
55     self.sockpath = utils.PathJoin(self.tmpdir, "sock")
56
57     self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
58     self.listener.settimeout(10)
59     self.listener.bind(self.sockpath)
60     self.listener.listen(1)
61
62   def tearDown(self):
63     self.listener.shutdown(socket.SHUT_RDWR)
64     self.listener.close()
65     shutil.rmtree(self.tmpdir)
66
67   def test(self):
68     (c2pr, c2pw) = os.pipe()
69
70     # Start child process
71     child = os.fork()
72     if child == 0:
73       try:
74         data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
75
76         os.write(c2pw, data)
77         os.close(c2pw)
78
79         os._exit(0)
80       finally:
81         os._exit(1)
82
83     os.close(c2pw)
84
85     # Wait for one connection
86     (conn, _) = self.listener.accept()
87     conn.recv(1)
88     conn.close()
89
90     # Wait for result
91     result = os.read(c2pr, 4096)
92     os.close(c2pr)
93
94     # Check child's exit code
95     (_, status) = os.waitpid(child, 0)
96     self.assertFalse(os.WIFSIGNALED(status))
97     self.assertEqual(os.WEXITSTATUS(status), 0)
98
99     # Check result
100     (pid, uid, gid) = serializer.LoadJson(result)
101     self.assertEqual(pid, os.getpid())
102     self.assertEqual(uid, os.getuid())
103     self.assertEqual(gid, os.getgid())
104
105
106 class TestHostname(unittest.TestCase):
107   """Testing case for Hostname"""
108
109   def testUppercase(self):
110     data = "AbC.example.com"
111     self.assertEqual(netutils.Hostname.GetNormalizedName(data), data.lower())
112
113   def testTooLongName(self):
114     data = "a.b." + "c" * 255
115     self.assertRaises(errors.OpPrereqError,
116                       netutils.Hostname.GetNormalizedName, data)
117
118   def testTrailingDot(self):
119     data = "a.b.c"
120     self.assertEqual(netutils.Hostname.GetNormalizedName(data + "."), data)
121
122   def testInvalidName(self):
123     data = [
124       "a b",
125       "a/b",
126       ".a.b",
127       "a..b",
128       ]
129     for value in data:
130       self.assertRaises(errors.OpPrereqError,
131                         netutils.Hostname.GetNormalizedName, value)
132
133   def testValidName(self):
134     data = [
135       "a.b",
136       "a-b",
137       "a_b",
138       "a.b.c",
139       ]
140     for value in data:
141       self.assertEqual(netutils.Hostname.GetNormalizedName(value), value)
142
143
144 class TestIPAddress(unittest.TestCase):
145   def testIsValid(self):
146     self.assert_(netutils.IPAddress.IsValid("0.0.0.0"))
147     self.assert_(netutils.IPAddress.IsValid("127.0.0.1"))
148     self.assert_(netutils.IPAddress.IsValid("::"))
149     self.assert_(netutils.IPAddress.IsValid("::1"))
150
151   def testNotIsValid(self):
152     self.assertFalse(netutils.IPAddress.IsValid("0"))
153     self.assertFalse(netutils.IPAddress.IsValid("1.1.1.256"))
154     self.assertFalse(netutils.IPAddress.IsValid("a:g::1"))
155
156   def testGetAddressFamily(self):
157     fn = netutils.IPAddress.GetAddressFamily
158     self.assertEqual(fn("127.0.0.1"), socket.AF_INET)
159     self.assertEqual(fn("10.2.0.127"), socket.AF_INET)
160     self.assertEqual(fn("::1"), socket.AF_INET6)
161     self.assertEqual(fn("2001:db8::1"), socket.AF_INET6)
162     self.assertRaises(errors.IPAddressError, fn, "0")
163
164   def testOwnLoopback(self):
165     # FIXME: In a pure IPv6 environment this is no longer true
166     self.assert_(netutils.IPAddress.Own("127.0.0.1"),
167                  "Should own 127.0.0.1 address")
168
169   def testNotOwnAddress(self):
170     self.assertFalse(netutils.IPAddress.Own("2001:db8::1"),
171                      "Should not own IP address 2001:db8::1")
172     self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
173                      "Should not own IP address 192.0.2.1")
174
175   def testFamilyVersionConversions(self):
176     # IPAddress.GetAddressFamilyFromVersion
177     self.assertEqual(
178         netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP4_VERSION),
179         socket.AF_INET)
180     self.assertEqual(
181         netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP6_VERSION),
182         socket.AF_INET6)
183     self.assertRaises(errors.ProgrammerError,
184         netutils.IPAddress.GetAddressFamilyFromVersion, 3)
185
186     # IPAddress.GetVersionFromAddressFamily
187     self.assertEqual(
188         netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET),
189         constants.IP4_VERSION)
190     self.assertEqual(
191         netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET6),
192         constants.IP6_VERSION)
193     self.assertRaises(errors.ProgrammerError,
194         netutils.IPAddress.GetVersionFromAddressFamily, socket.AF_UNIX)
195
196
197 class TestIP4Address(unittest.TestCase):
198   def testGetIPIntFromString(self):
199     fn = netutils.IP4Address._GetIPIntFromString
200     self.assertEqual(fn("0.0.0.0"), 0)
201     self.assertEqual(fn("0.0.0.1"), 1)
202     self.assertEqual(fn("127.0.0.1"), 2130706433)
203     self.assertEqual(fn("192.0.2.129"), 3221226113)
204     self.assertEqual(fn("255.255.255.255"), 2**32 - 1)
205     self.assertNotEqual(fn("0.0.0.0"), 1)
206     self.assertNotEqual(fn("0.0.0.0"), 1)
207
208   def testIsValid(self):
209     self.assert_(netutils.IP4Address.IsValid("0.0.0.0"))
210     self.assert_(netutils.IP4Address.IsValid("127.0.0.1"))
211     self.assert_(netutils.IP4Address.IsValid("192.0.2.199"))
212     self.assert_(netutils.IP4Address.IsValid("255.255.255.255"))
213
214   def testNotIsValid(self):
215     self.assertFalse(netutils.IP4Address.IsValid("0"))
216     self.assertFalse(netutils.IP4Address.IsValid("1"))
217     self.assertFalse(netutils.IP4Address.IsValid("1.1.1"))
218     self.assertFalse(netutils.IP4Address.IsValid("255.255.255.256"))
219     self.assertFalse(netutils.IP4Address.IsValid("::1"))
220
221   def testInNetwork(self):
222     self.assert_(netutils.IP4Address.InNetwork("127.0.0.0/8", "127.0.0.1"))
223
224   def testNotInNetwork(self):
225     self.assertFalse(netutils.IP4Address.InNetwork("192.0.2.0/24",
226                                                    "127.0.0.1"))
227
228   def testIsLoopback(self):
229     self.assert_(netutils.IP4Address.IsLoopback("127.0.0.1"))
230
231   def testNotIsLoopback(self):
232     self.assertFalse(netutils.IP4Address.IsLoopback("192.0.2.1"))
233
234
235 class TestIP6Address(unittest.TestCase):
236   def testGetIPIntFromString(self):
237     fn = netutils.IP6Address._GetIPIntFromString
238     self.assertEqual(fn("::"), 0)
239     self.assertEqual(fn("::1"), 1)
240     self.assertEqual(fn("2001:db8::1"),
241                      42540766411282592856903984951653826561L)
242     self.assertEqual(fn("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), 2**128-1)
243     self.assertNotEqual(netutils.IP6Address._GetIPIntFromString("::2"), 1)
244
245   def testIsValid(self):
246     self.assert_(netutils.IP6Address.IsValid("::"))
247     self.assert_(netutils.IP6Address.IsValid("::1"))
248     self.assert_(netutils.IP6Address.IsValid("1" + (":1" * 7)))
249     self.assert_(netutils.IP6Address.IsValid("ffff" + (":ffff" * 7)))
250     self.assert_(netutils.IP6Address.IsValid("::"))
251
252   def testNotIsValid(self):
253     self.assertFalse(netutils.IP6Address.IsValid("0"))
254     self.assertFalse(netutils.IP6Address.IsValid(":1"))
255     self.assertFalse(netutils.IP6Address.IsValid("f" + (":f" * 6)))
256     self.assertFalse(netutils.IP6Address.IsValid("fffg" + (":ffff" * 7)))
257     self.assertFalse(netutils.IP6Address.IsValid("fffff" + (":ffff" * 7)))
258     self.assertFalse(netutils.IP6Address.IsValid("1" + (":1" * 8)))
259     self.assertFalse(netutils.IP6Address.IsValid("127.0.0.1"))
260
261   def testInNetwork(self):
262     self.assert_(netutils.IP6Address.InNetwork("::1/128", "::1"))
263
264   def testNotInNetwork(self):
265     self.assertFalse(netutils.IP6Address.InNetwork("2001:db8::1/128", "::1"))
266
267   def testIsLoopback(self):
268     self.assert_(netutils.IP6Address.IsLoopback("::1"))
269
270   def testNotIsLoopback(self):
271     self.assertFalse(netutils.IP6Address.IsLoopback("2001:db8::1"))
272
273
274 class _BaseTcpPingTest:
275   """Base class for TcpPing tests against listen(2)ing port"""
276   family = None
277   address = None
278
279   def setUp(self):
280     self.listener = socket.socket(self.family, socket.SOCK_STREAM)
281     self.listener.bind((self.address, 0))
282     self.listenerport = self.listener.getsockname()[1]
283     self.listener.listen(1)
284
285   def tearDown(self):
286     self.listener.shutdown(socket.SHUT_RDWR)
287     del self.listener
288     del self.listenerport
289
290   def testTcpPingToLocalHostAccept(self):
291     self.assert_(netutils.TcpPing(self.address,
292                                   self.listenerport,
293                                   timeout=constants.TCP_PING_TIMEOUT,
294                                   live_port_needed=True,
295                                   source=self.address,
296                                   ),
297                  "failed to connect to test listener")
298
299     self.assert_(netutils.TcpPing(self.address, self.listenerport,
300                                   timeout=constants.TCP_PING_TIMEOUT,
301                                   live_port_needed=True),
302                  "failed to connect to test listener (no source)")
303
304
305 class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
306   """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
307   family = socket.AF_INET
308   address = constants.IP4_ADDRESS_LOCALHOST
309
310   def setUp(self):
311     unittest.TestCase.setUp(self)
312     _BaseTcpPingTest.setUp(self)
313
314   def tearDown(self):
315     unittest.TestCase.tearDown(self)
316     _BaseTcpPingTest.tearDown(self)
317
318
319 class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
320   """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
321   family = socket.AF_INET6
322   address = constants.IP6_ADDRESS_LOCALHOST
323
324   def setUp(self):
325     unittest.TestCase.setUp(self)
326     _BaseTcpPingTest.setUp(self)
327
328   def tearDown(self):
329     unittest.TestCase.tearDown(self)
330     _BaseTcpPingTest.tearDown(self)
331
332
333 class _BaseTcpPingDeafTest:
334   """Base class for TcpPing tests against non listen(2)ing port"""
335   family = None
336   address = None
337
338   def setUp(self):
339     self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
340     self.deaflistener.bind((self.address, 0))
341     self.deaflistenerport = self.deaflistener.getsockname()[1]
342
343   def tearDown(self):
344     del self.deaflistener
345     del self.deaflistenerport
346
347   def testTcpPingToLocalHostAcceptDeaf(self):
348     self.assertFalse(netutils.TcpPing(self.address,
349                                       self.deaflistenerport,
350                                       timeout=constants.TCP_PING_TIMEOUT,
351                                       live_port_needed=True,
352                                       source=self.address,
353                                       ), # need successful connect(2)
354                      "successfully connected to deaf listener")
355
356     self.assertFalse(netutils.TcpPing(self.address,
357                                       self.deaflistenerport,
358                                       timeout=constants.TCP_PING_TIMEOUT,
359                                       live_port_needed=True,
360                                       ), # need successful connect(2)
361                      "successfully connected to deaf listener (no source)")
362
363   def testTcpPingToLocalHostNoAccept(self):
364     self.assert_(netutils.TcpPing(self.address,
365                                   self.deaflistenerport,
366                                   timeout=constants.TCP_PING_TIMEOUT,
367                                   live_port_needed=False,
368                                   source=self.address,
369                                   ), # ECONNREFUSED is OK
370                  "failed to ping alive host on deaf port")
371
372     self.assert_(netutils.TcpPing(self.address,
373                                   self.deaflistenerport,
374                                   timeout=constants.TCP_PING_TIMEOUT,
375                                   live_port_needed=False,
376                                   ), # ECONNREFUSED is OK
377                  "failed to ping alive host on deaf port (no source)")
378
379
380 class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
381   """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
382   family = socket.AF_INET
383   address = constants.IP4_ADDRESS_LOCALHOST
384
385   def setUp(self):
386     self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
387     self.deaflistener.bind((self.address, 0))
388     self.deaflistenerport = self.deaflistener.getsockname()[1]
389
390   def tearDown(self):
391     del self.deaflistener
392     del self.deaflistenerport
393
394
395 class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
396   """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
397   family = socket.AF_INET6
398   address = constants.IP6_ADDRESS_LOCALHOST
399
400   def setUp(self):
401     unittest.TestCase.setUp(self)
402     _BaseTcpPingDeafTest.setUp(self)
403
404   def tearDown(self):
405     unittest.TestCase.tearDown(self)
406     _BaseTcpPingDeafTest.tearDown(self)
407
408
409 class TestFormatAddress(unittest.TestCase):
410   """Testcase for FormatAddress"""
411
412   def testFormatAddressUnixSocket(self):
413     res1 = netutils.FormatAddress(("12352", 0, 0), family=socket.AF_UNIX)
414     self.assertEqual(res1, "pid=12352, uid=0, gid=0")
415
416   def testFormatAddressIP4(self):
417     res1 = netutils.FormatAddress(("127.0.0.1", 1234), family=socket.AF_INET)
418     self.assertEqual(res1, "127.0.0.1:1234")
419     res2 = netutils.FormatAddress(("192.0.2.32", None), family=socket.AF_INET)
420     self.assertEqual(res2, "192.0.2.32")
421
422   def testFormatAddressIP6(self):
423     res1 = netutils.FormatAddress(("::1", 1234), family=socket.AF_INET6)
424     self.assertEqual(res1, "[::1]:1234")
425     res2 = netutils.FormatAddress(("::1", None), family=socket.AF_INET6)
426     self.assertEqual(res2, "[::1]")
427     res2 = netutils.FormatAddress(("2001:db8::beef", "80"),
428                                   family=socket.AF_INET6)
429     self.assertEqual(res2, "[2001:db8::beef]:80")
430
431   def testFormatAddressWithoutFamily(self):
432     res1 = netutils.FormatAddress(("127.0.0.1", 1234))
433     self.assertEqual(res1, "127.0.0.1:1234")
434     res2 = netutils.FormatAddress(("::1", 1234))
435     self.assertEqual(res2, "[::1]:1234")
436
437
438   def testInvalidFormatAddress(self):
439     self.assertRaises(errors.ParameterError, netutils.FormatAddress,
440                       "127.0.0.1")
441     self.assertRaises(errors.ParameterError, netutils.FormatAddress,
442                       "127.0.0.1", family=socket.AF_INET)
443     self.assertRaises(errors.ParameterError, netutils.FormatAddress,
444                       ("::1"), family=socket.AF_INET )
445
446 class TestIpParsing(testutils.GanetiTestCase):
447   """Test the code that parses the ip command output"""
448
449   def testIp4(self):
450     valid_addresses = [constants.IP4_ADDRESS_ANY,
451                        constants.IP4_ADDRESS_LOCALHOST,
452                        "192.0.2.1",     # RFC5737, IPv4 address blocks for docs
453                        "198.51.100.1",
454                        "203.0.113.1",
455                       ]
456     for addr in valid_addresses:
457       self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
458
459   def testIp6(self):
460     valid_addresses = [constants.IP6_ADDRESS_ANY,
461                        constants.IP6_ADDRESS_LOCALHOST,
462                        "0:0:0:0:0:0:0:1", # other form for IP6_ADDRESS_LOCALHOST
463                        "0:0:0:0:0:0:0:0", # other form for IP6_ADDRESS_ANY
464                        "2001:db8:85a3::8a2e:370:7334", # RFC3849 IP6 docs block
465                        "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
466                        "0:0:0:0:0:FFFF:192.0.2.1",  # IPv4-compatible IPv6
467                        "::FFFF:192.0.2.1",
468                        "0:0:0:0:0:0:203.0.113.1",   # IPv4-mapped IPv6
469                        "::203.0.113.1",
470                       ]
471     for addr in valid_addresses:
472       self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
473
474   def testParseIpCommandOutput(self):
475     # IPv4-only, fake loopback interface
476     tests = ["ip-addr-show-lo-ipv4.txt", "ip-addr-show-lo-oneline-ipv4.txt"]
477     for test_file in tests:
478       data = self._ReadTestData(test_file)
479       addr = netutils._GetIpAddressesFromIpOutput(data)
480       self.failUnless(len(addr[4]) == 1 and addr[4][0] == "127.0.0.1" and not
481                       addr[6])
482
483     # IPv6-only, fake loopback interface
484     tests = ["ip-addr-show-lo-ipv6.txt", "ip-addr-show-lo-ipv6.txt"]
485     for test_file in tests:
486       data = self._ReadTestData(test_file)
487       addr = netutils._GetIpAddressesFromIpOutput(data)
488       self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and not addr[4])
489
490     # IPv4 and IPv6, fake loopback interface
491     tests = ["ip-addr-show-lo.txt", "ip-addr-show-lo-oneline.txt"]
492     for test_file in tests:
493       data = self._ReadTestData(test_file)
494       addr = netutils._GetIpAddressesFromIpOutput(data)
495       self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and
496                       len(addr[4]) == 1 and addr[4][0] == "127.0.0.1")
497
498     # IPv4 and IPv6, dummy interface
499     data = self._ReadTestData("ip-addr-show-dummy0.txt")
500     addr = netutils._GetIpAddressesFromIpOutput(data)
501     self.failUnless(len(addr[6]) == 1 and
502                     addr[6][0] == "2001:db8:85a3::8a2e:370:7334" and
503                     len(addr[4]) == 1 and
504                     addr[4][0] == "192.0.2.1")
505
506
507 if __name__ == "__main__":
508   testutils.GanetiTestProgram()