verify-disks: Explicitely state nothing has to be done
[ganeti-local] / test / ganeti.netutils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the netutils module"""
23
24 import os
25 import re
26 import shutil
27 import socket
28 import tempfile
29 import unittest
30
31 import testutils
32 from ganeti import constants
33 from ganeti import errors
34 from ganeti import netutils
35 from ganeti import serializer
36 from ganeti import utils
37
38
39 def _GetSocketCredentials(path):
40   """Connect to a Unix socket and return remote credentials.
41
42   """
43   sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
44   try:
45     sock.settimeout(10)
46     sock.connect(path)
47     return netutils.GetSocketCredentials(sock)
48   finally:
49     sock.close()
50
51
52 class TestGetSocketCredentials(unittest.TestCase):
53   def setUp(self):
54     self.tmpdir = tempfile.mkdtemp()
55     self.sockpath = utils.PathJoin(self.tmpdir, "sock")
56
57     self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
58     self.listener.settimeout(10)
59     self.listener.bind(self.sockpath)
60     self.listener.listen(1)
61
62   def tearDown(self):
63     self.listener.shutdown(socket.SHUT_RDWR)
64     self.listener.close()
65     shutil.rmtree(self.tmpdir)
66
67   def test(self):
68     (c2pr, c2pw) = os.pipe()
69
70     # Start child process
71     child = os.fork()
72     if child == 0:
73       try:
74         data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
75
76         os.write(c2pw, data)
77         os.close(c2pw)
78
79         os._exit(0)
80       finally:
81         os._exit(1)
82
83     os.close(c2pw)
84
85     # Wait for one connection
86     (conn, _) = self.listener.accept()
87     conn.recv(1)
88     conn.close()
89
90     # Wait for result
91     result = os.read(c2pr, 4096)
92     os.close(c2pr)
93
94     # Check child's exit code
95     (_, status) = os.waitpid(child, 0)
96     self.assertFalse(os.WIFSIGNALED(status))
97     self.assertEqual(os.WEXITSTATUS(status), 0)
98
99     # Check result
100     (pid, uid, gid) = serializer.LoadJson(result)
101     self.assertEqual(pid, os.getpid())
102     self.assertEqual(uid, os.getuid())
103     self.assertEqual(gid, os.getgid())
104
105
106 class TestHostname(unittest.TestCase):
107   """Testing case for Hostname"""
108
109   def testUppercase(self):
110     data = "AbC.example.com"
111     self.assertEqual(netutils.Hostname.GetNormalizedName(data), data.lower())
112
113   def testTooLongName(self):
114     data = "a.b." + "c" * 255
115     self.assertRaises(errors.OpPrereqError,
116                       netutils.Hostname.GetNormalizedName, data)
117
118   def testTrailingDot(self):
119     data = "a.b.c"
120     self.assertEqual(netutils.Hostname.GetNormalizedName(data + "."), data)
121
122   def testInvalidName(self):
123     data = [
124       "a b",
125       "a/b",
126       ".a.b",
127       "a..b",
128       ]
129     for value in data:
130       self.assertRaises(errors.OpPrereqError,
131                         netutils.Hostname.GetNormalizedName, value)
132
133   def testValidName(self):
134     data = [
135       "a.b",
136       "a-b",
137       "a_b",
138       "a.b.c",
139       ]
140     for value in data:
141       self.assertEqual(netutils.Hostname.GetNormalizedName(value), value)
142
143
144 class TestIPAddress(unittest.TestCase):
145   def testIsValid(self):
146     self.assert_(netutils.IPAddress.IsValid("0.0.0.0"))
147     self.assert_(netutils.IPAddress.IsValid("127.0.0.1"))
148     self.assert_(netutils.IPAddress.IsValid("::"))
149     self.assert_(netutils.IPAddress.IsValid("::1"))
150
151   def testNotIsValid(self):
152     self.assertFalse(netutils.IPAddress.IsValid("0"))
153     self.assertFalse(netutils.IPAddress.IsValid("1.1.1.256"))
154     self.assertFalse(netutils.IPAddress.IsValid("a:g::1"))
155
156   def testGetAddressFamily(self):
157     fn = netutils.IPAddress.GetAddressFamily
158     self.assertEqual(fn("127.0.0.1"), socket.AF_INET)
159     self.assertEqual(fn("10.2.0.127"), socket.AF_INET)
160     self.assertEqual(fn("::1"), socket.AF_INET6)
161     self.assertEqual(fn("2001:db8::1"), socket.AF_INET6)
162     self.assertRaises(errors.IPAddressError, fn, "0")
163
164   def testValidateNetmask(self):
165     for netmask in [0, 33]:
166       self.assertFalse(netutils.IP4Address.ValidateNetmask(netmask))
167
168     for netmask in [1, 32]:
169       self.assertTrue(netutils.IP4Address.ValidateNetmask(netmask))
170
171     for netmask in [0, 129]:
172       self.assertFalse(netutils.IP6Address.ValidateNetmask(netmask))
173
174     for netmask in [1, 128]:
175       self.assertTrue(netutils.IP6Address.ValidateNetmask(netmask))
176
177   def testGetClassFromX(self):
178     self.assert_(
179         netutils.IPAddress.GetClassFromIpVersion(constants.IP4_VERSION) ==
180         netutils.IP4Address)
181     self.assert_(
182         netutils.IPAddress.GetClassFromIpVersion(constants.IP6_VERSION) ==
183         netutils.IP6Address)
184     self.assert_(
185         netutils.IPAddress.GetClassFromIpFamily(socket.AF_INET) ==
186         netutils.IP4Address)
187     self.assert_(
188         netutils.IPAddress.GetClassFromIpFamily(socket.AF_INET6) ==
189         netutils.IP6Address)
190
191   def testOwnLoopback(self):
192     # FIXME: In a pure IPv6 environment this is no longer true
193     self.assert_(netutils.IPAddress.Own("127.0.0.1"),
194                  "Should own 127.0.0.1 address")
195
196   def testNotOwnAddress(self):
197     self.assertFalse(netutils.IPAddress.Own("2001:db8::1"),
198                      "Should not own IP address 2001:db8::1")
199     self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
200                      "Should not own IP address 192.0.2.1")
201
202   def testFamilyVersionConversions(self):
203     # IPAddress.GetAddressFamilyFromVersion
204     self.assertEqual(
205         netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP4_VERSION),
206         socket.AF_INET)
207     self.assertEqual(
208         netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP6_VERSION),
209         socket.AF_INET6)
210     self.assertRaises(errors.ProgrammerError,
211         netutils.IPAddress.GetAddressFamilyFromVersion, 3)
212
213     # IPAddress.GetVersionFromAddressFamily
214     self.assertEqual(
215         netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET),
216         constants.IP4_VERSION)
217     self.assertEqual(
218         netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET6),
219         constants.IP6_VERSION)
220     self.assertRaises(errors.ProgrammerError,
221         netutils.IPAddress.GetVersionFromAddressFamily, socket.AF_UNIX)
222
223
224 class TestIP4Address(unittest.TestCase):
225   def testGetIPIntFromString(self):
226     fn = netutils.IP4Address._GetIPIntFromString
227     self.assertEqual(fn("0.0.0.0"), 0)
228     self.assertEqual(fn("0.0.0.1"), 1)
229     self.assertEqual(fn("127.0.0.1"), 2130706433)
230     self.assertEqual(fn("192.0.2.129"), 3221226113)
231     self.assertEqual(fn("255.255.255.255"), 2**32 - 1)
232     self.assertNotEqual(fn("0.0.0.0"), 1)
233     self.assertNotEqual(fn("0.0.0.0"), 1)
234
235   def testIsValid(self):
236     self.assert_(netutils.IP4Address.IsValid("0.0.0.0"))
237     self.assert_(netutils.IP4Address.IsValid("127.0.0.1"))
238     self.assert_(netutils.IP4Address.IsValid("192.0.2.199"))
239     self.assert_(netutils.IP4Address.IsValid("255.255.255.255"))
240
241   def testNotIsValid(self):
242     self.assertFalse(netutils.IP4Address.IsValid("0"))
243     self.assertFalse(netutils.IP4Address.IsValid("1"))
244     self.assertFalse(netutils.IP4Address.IsValid("1.1.1"))
245     self.assertFalse(netutils.IP4Address.IsValid("255.255.255.256"))
246     self.assertFalse(netutils.IP4Address.IsValid("::1"))
247
248   def testInNetwork(self):
249     self.assert_(netutils.IP4Address.InNetwork("127.0.0.0/8", "127.0.0.1"))
250
251   def testNotInNetwork(self):
252     self.assertFalse(netutils.IP4Address.InNetwork("192.0.2.0/24",
253                                                    "127.0.0.1"))
254
255   def testIsLoopback(self):
256     self.assert_(netutils.IP4Address.IsLoopback("127.0.0.1"))
257
258   def testNotIsLoopback(self):
259     self.assertFalse(netutils.IP4Address.IsLoopback("192.0.2.1"))
260
261
262 class TestIP6Address(unittest.TestCase):
263   def testGetIPIntFromString(self):
264     fn = netutils.IP6Address._GetIPIntFromString
265     self.assertEqual(fn("::"), 0)
266     self.assertEqual(fn("::1"), 1)
267     self.assertEqual(fn("2001:db8::1"),
268                      42540766411282592856903984951653826561L)
269     self.assertEqual(fn("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), 2**128-1)
270     self.assertNotEqual(netutils.IP6Address._GetIPIntFromString("::2"), 1)
271
272   def testIsValid(self):
273     self.assert_(netutils.IP6Address.IsValid("::"))
274     self.assert_(netutils.IP6Address.IsValid("::1"))
275     self.assert_(netutils.IP6Address.IsValid("1" + (":1" * 7)))
276     self.assert_(netutils.IP6Address.IsValid("ffff" + (":ffff" * 7)))
277     self.assert_(netutils.IP6Address.IsValid("::"))
278
279   def testNotIsValid(self):
280     self.assertFalse(netutils.IP6Address.IsValid("0"))
281     self.assertFalse(netutils.IP6Address.IsValid(":1"))
282     self.assertFalse(netutils.IP6Address.IsValid("f" + (":f" * 6)))
283     self.assertFalse(netutils.IP6Address.IsValid("fffg" + (":ffff" * 7)))
284     self.assertFalse(netutils.IP6Address.IsValid("fffff" + (":ffff" * 7)))
285     self.assertFalse(netutils.IP6Address.IsValid("1" + (":1" * 8)))
286     self.assertFalse(netutils.IP6Address.IsValid("127.0.0.1"))
287
288   def testInNetwork(self):
289     self.assert_(netutils.IP6Address.InNetwork("::1/128", "::1"))
290
291   def testNotInNetwork(self):
292     self.assertFalse(netutils.IP6Address.InNetwork("2001:db8::1/128", "::1"))
293
294   def testIsLoopback(self):
295     self.assert_(netutils.IP6Address.IsLoopback("::1"))
296
297   def testNotIsLoopback(self):
298     self.assertFalse(netutils.IP6Address.IsLoopback("2001:db8::1"))
299
300
301 class _BaseTcpPingTest:
302   """Base class for TcpPing tests against listen(2)ing port"""
303   family = None
304   address = None
305
306   def setUp(self):
307     self.listener = socket.socket(self.family, socket.SOCK_STREAM)
308     self.listener.bind((self.address, 0))
309     self.listenerport = self.listener.getsockname()[1]
310     self.listener.listen(1)
311
312   def tearDown(self):
313     self.listener.shutdown(socket.SHUT_RDWR)
314     del self.listener
315     del self.listenerport
316
317   def testTcpPingToLocalHostAccept(self):
318     self.assert_(netutils.TcpPing(self.address,
319                                   self.listenerport,
320                                   timeout=constants.TCP_PING_TIMEOUT,
321                                   live_port_needed=True,
322                                   source=self.address,
323                                   ),
324                  "failed to connect to test listener")
325
326     self.assert_(netutils.TcpPing(self.address, self.listenerport,
327                                   timeout=constants.TCP_PING_TIMEOUT,
328                                   live_port_needed=True),
329                  "failed to connect to test listener (no source)")
330
331
332 class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
333   """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
334   family = socket.AF_INET
335   address = constants.IP4_ADDRESS_LOCALHOST
336
337   def setUp(self):
338     unittest.TestCase.setUp(self)
339     _BaseTcpPingTest.setUp(self)
340
341   def tearDown(self):
342     unittest.TestCase.tearDown(self)
343     _BaseTcpPingTest.tearDown(self)
344
345
346 class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
347   """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
348   family = socket.AF_INET6
349   address = constants.IP6_ADDRESS_LOCALHOST
350
351   def setUp(self):
352     unittest.TestCase.setUp(self)
353     _BaseTcpPingTest.setUp(self)
354
355   def tearDown(self):
356     unittest.TestCase.tearDown(self)
357     _BaseTcpPingTest.tearDown(self)
358
359
360 class _BaseTcpPingDeafTest:
361   """Base class for TcpPing tests against non listen(2)ing port"""
362   family = None
363   address = None
364
365   def setUp(self):
366     self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
367     self.deaflistener.bind((self.address, 0))
368     self.deaflistenerport = self.deaflistener.getsockname()[1]
369
370   def tearDown(self):
371     del self.deaflistener
372     del self.deaflistenerport
373
374   def testTcpPingToLocalHostAcceptDeaf(self):
375     self.assertFalse(netutils.TcpPing(self.address,
376                                       self.deaflistenerport,
377                                       timeout=constants.TCP_PING_TIMEOUT,
378                                       live_port_needed=True,
379                                       source=self.address,
380                                       ), # need successful connect(2)
381                      "successfully connected to deaf listener")
382
383     self.assertFalse(netutils.TcpPing(self.address,
384                                       self.deaflistenerport,
385                                       timeout=constants.TCP_PING_TIMEOUT,
386                                       live_port_needed=True,
387                                       ), # need successful connect(2)
388                      "successfully connected to deaf listener (no source)")
389
390   def testTcpPingToLocalHostNoAccept(self):
391     self.assert_(netutils.TcpPing(self.address,
392                                   self.deaflistenerport,
393                                   timeout=constants.TCP_PING_TIMEOUT,
394                                   live_port_needed=False,
395                                   source=self.address,
396                                   ), # ECONNREFUSED is OK
397                  "failed to ping alive host on deaf port")
398
399     self.assert_(netutils.TcpPing(self.address,
400                                   self.deaflistenerport,
401                                   timeout=constants.TCP_PING_TIMEOUT,
402                                   live_port_needed=False,
403                                   ), # ECONNREFUSED is OK
404                  "failed to ping alive host on deaf port (no source)")
405
406
407 class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
408   """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
409   family = socket.AF_INET
410   address = constants.IP4_ADDRESS_LOCALHOST
411
412   def setUp(self):
413     self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
414     self.deaflistener.bind((self.address, 0))
415     self.deaflistenerport = self.deaflistener.getsockname()[1]
416
417   def tearDown(self):
418     del self.deaflistener
419     del self.deaflistenerport
420
421
422 class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
423   """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
424   family = socket.AF_INET6
425   address = constants.IP6_ADDRESS_LOCALHOST
426
427   def setUp(self):
428     unittest.TestCase.setUp(self)
429     _BaseTcpPingDeafTest.setUp(self)
430
431   def tearDown(self):
432     unittest.TestCase.tearDown(self)
433     _BaseTcpPingDeafTest.tearDown(self)
434
435
436 class TestFormatAddress(unittest.TestCase):
437   """Testcase for FormatAddress"""
438
439   def testFormatAddressUnixSocket(self):
440     res1 = netutils.FormatAddress(("12352", 0, 0), family=socket.AF_UNIX)
441     self.assertEqual(res1, "pid=12352, uid=0, gid=0")
442
443   def testFormatAddressIP4(self):
444     res1 = netutils.FormatAddress(("127.0.0.1", 1234), family=socket.AF_INET)
445     self.assertEqual(res1, "127.0.0.1:1234")
446     res2 = netutils.FormatAddress(("192.0.2.32", None), family=socket.AF_INET)
447     self.assertEqual(res2, "192.0.2.32")
448
449   def testFormatAddressIP6(self):
450     res1 = netutils.FormatAddress(("::1", 1234), family=socket.AF_INET6)
451     self.assertEqual(res1, "[::1]:1234")
452     res2 = netutils.FormatAddress(("::1", None), family=socket.AF_INET6)
453     self.assertEqual(res2, "[::1]")
454     res2 = netutils.FormatAddress(("2001:db8::beef", "80"),
455                                   family=socket.AF_INET6)
456     self.assertEqual(res2, "[2001:db8::beef]:80")
457
458   def testFormatAddressWithoutFamily(self):
459     res1 = netutils.FormatAddress(("127.0.0.1", 1234))
460     self.assertEqual(res1, "127.0.0.1:1234")
461     res2 = netutils.FormatAddress(("::1", 1234))
462     self.assertEqual(res2, "[::1]:1234")
463
464
465   def testInvalidFormatAddress(self):
466     self.assertRaises(errors.ParameterError, netutils.FormatAddress,
467                       "127.0.0.1")
468     self.assertRaises(errors.ParameterError, netutils.FormatAddress,
469                       "127.0.0.1", family=socket.AF_INET)
470     self.assertRaises(errors.ParameterError, netutils.FormatAddress,
471                       ("::1"), family=socket.AF_INET )
472
473 class TestIpParsing(testutils.GanetiTestCase):
474   """Test the code that parses the ip command output"""
475
476   def testIp4(self):
477     valid_addresses = [constants.IP4_ADDRESS_ANY,
478                        constants.IP4_ADDRESS_LOCALHOST,
479                        "192.0.2.1",     # RFC5737, IPv4 address blocks for docs
480                        "198.51.100.1",
481                        "203.0.113.1",
482                       ]
483     for addr in valid_addresses:
484       self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
485
486   def testIp6(self):
487     valid_addresses = [constants.IP6_ADDRESS_ANY,
488                        constants.IP6_ADDRESS_LOCALHOST,
489                        "0:0:0:0:0:0:0:1", # other form for IP6_ADDRESS_LOCALHOST
490                        "0:0:0:0:0:0:0:0", # other form for IP6_ADDRESS_ANY
491                        "2001:db8:85a3::8a2e:370:7334", # RFC3849 IP6 docs block
492                        "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
493                        "0:0:0:0:0:FFFF:192.0.2.1",  # IPv4-compatible IPv6
494                        "::FFFF:192.0.2.1",
495                        "0:0:0:0:0:0:203.0.113.1",   # IPv4-mapped IPv6
496                        "::203.0.113.1",
497                       ]
498     for addr in valid_addresses:
499       self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
500
501   def testParseIpCommandOutput(self):
502     # IPv4-only, fake loopback interface
503     tests = ["ip-addr-show-lo-ipv4.txt", "ip-addr-show-lo-oneline-ipv4.txt"]
504     for test_file in tests:
505       data = self._ReadTestData(test_file)
506       addr = netutils._GetIpAddressesFromIpOutput(data)
507       self.failUnless(len(addr[4]) == 1 and addr[4][0] == "127.0.0.1" and not
508                       addr[6])
509
510     # IPv6-only, fake loopback interface
511     tests = ["ip-addr-show-lo-ipv6.txt", "ip-addr-show-lo-ipv6.txt"]
512     for test_file in tests:
513       data = self._ReadTestData(test_file)
514       addr = netutils._GetIpAddressesFromIpOutput(data)
515       self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and not addr[4])
516
517     # IPv4 and IPv6, fake loopback interface
518     tests = ["ip-addr-show-lo.txt", "ip-addr-show-lo-oneline.txt"]
519     for test_file in tests:
520       data = self._ReadTestData(test_file)
521       addr = netutils._GetIpAddressesFromIpOutput(data)
522       self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and
523                       len(addr[4]) == 1 and addr[4][0] == "127.0.0.1")
524
525     # IPv4 and IPv6, dummy interface
526     data = self._ReadTestData("ip-addr-show-dummy0.txt")
527     addr = netutils._GetIpAddressesFromIpOutput(data)
528     self.failUnless(len(addr[6]) == 1 and
529                     addr[6][0] == "2001:db8:85a3::8a2e:370:7334" and
530                     len(addr[4]) == 1 and
531                     addr[4][0] == "192.0.2.1")
532
533
534 if __name__ == "__main__":
535   testutils.GanetiTestProgram()