4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the netutils module"""
32 from ganeti import constants
33 from ganeti import errors
34 from ganeti import netutils
35 from ganeti import serializer
36 from ganeti import utils
39 def _GetSocketCredentials(path):
40 """Connect to a Unix socket and return remote credentials.
43 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
47 return netutils.GetSocketCredentials(sock)
52 class TestGetSocketCredentials(unittest.TestCase):
54 self.tmpdir = tempfile.mkdtemp()
55 self.sockpath = utils.PathJoin(self.tmpdir, "sock")
57 self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
58 self.listener.settimeout(10)
59 self.listener.bind(self.sockpath)
60 self.listener.listen(1)
63 self.listener.shutdown(socket.SHUT_RDWR)
65 shutil.rmtree(self.tmpdir)
68 (c2pr, c2pw) = os.pipe()
74 data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
85 # Wait for one connection
86 (conn, _) = self.listener.accept()
91 result = os.read(c2pr, 4096)
94 # Check child's exit code
95 (_, status) = os.waitpid(child, 0)
96 self.assertFalse(os.WIFSIGNALED(status))
97 self.assertEqual(os.WEXITSTATUS(status), 0)
100 (pid, uid, gid) = serializer.LoadJson(result)
101 self.assertEqual(pid, os.getpid())
102 self.assertEqual(uid, os.getuid())
103 self.assertEqual(gid, os.getgid())
106 class TestHostname(unittest.TestCase):
107 """Testing case for Hostname"""
109 def testUppercase(self):
110 data = "AbC.example.com"
111 self.assertEqual(netutils.Hostname.GetNormalizedName(data), data.lower())
113 def testTooLongName(self):
114 data = "a.b." + "c" * 255
115 self.assertRaises(errors.OpPrereqError,
116 netutils.Hostname.GetNormalizedName, data)
118 def testTrailingDot(self):
120 self.assertEqual(netutils.Hostname.GetNormalizedName(data + "."), data)
122 def testInvalidName(self):
130 self.assertRaises(errors.OpPrereqError,
131 netutils.Hostname.GetNormalizedName, value)
133 def testValidName(self):
141 self.assertEqual(netutils.Hostname.GetNormalizedName(value), value)
144 class TestIPAddress(unittest.TestCase):
145 def testIsValid(self):
146 self.assert_(netutils.IPAddress.IsValid("0.0.0.0"))
147 self.assert_(netutils.IPAddress.IsValid("127.0.0.1"))
148 self.assert_(netutils.IPAddress.IsValid("::"))
149 self.assert_(netutils.IPAddress.IsValid("::1"))
151 def testNotIsValid(self):
152 self.assertFalse(netutils.IPAddress.IsValid("0"))
153 self.assertFalse(netutils.IPAddress.IsValid("1.1.1.256"))
154 self.assertFalse(netutils.IPAddress.IsValid("a:g::1"))
156 def testGetAddressFamily(self):
157 fn = netutils.IPAddress.GetAddressFamily
158 self.assertEqual(fn("127.0.0.1"), socket.AF_INET)
159 self.assertEqual(fn("10.2.0.127"), socket.AF_INET)
160 self.assertEqual(fn("::1"), socket.AF_INET6)
161 self.assertEqual(fn("2001:db8::1"), socket.AF_INET6)
162 self.assertRaises(errors.IPAddressError, fn, "0")
164 def testValidateNetmask(self):
165 for netmask in [0, 33]:
166 self.assertFalse(netutils.IP4Address.ValidateNetmask(netmask))
168 for netmask in [1, 32]:
169 self.assertTrue(netutils.IP4Address.ValidateNetmask(netmask))
171 for netmask in [0, 129]:
172 self.assertFalse(netutils.IP6Address.ValidateNetmask(netmask))
174 for netmask in [1, 128]:
175 self.assertTrue(netutils.IP6Address.ValidateNetmask(netmask))
177 def testGetClassFromX(self):
179 netutils.IPAddress.GetClassFromIpVersion(constants.IP4_VERSION) ==
182 netutils.IPAddress.GetClassFromIpVersion(constants.IP6_VERSION) ==
185 netutils.IPAddress.GetClassFromIpFamily(socket.AF_INET) ==
188 netutils.IPAddress.GetClassFromIpFamily(socket.AF_INET6) ==
191 def testOwnLoopback(self):
192 # FIXME: In a pure IPv6 environment this is no longer true
193 self.assert_(netutils.IPAddress.Own("127.0.0.1"),
194 "Should own 127.0.0.1 address")
196 def testNotOwnAddress(self):
197 self.assertFalse(netutils.IPAddress.Own("2001:db8::1"),
198 "Should not own IP address 2001:db8::1")
199 self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
200 "Should not own IP address 192.0.2.1")
202 def testFamilyVersionConversions(self):
203 # IPAddress.GetAddressFamilyFromVersion
205 netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP4_VERSION),
208 netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP6_VERSION),
210 self.assertRaises(errors.ProgrammerError,
211 netutils.IPAddress.GetAddressFamilyFromVersion, 3)
213 # IPAddress.GetVersionFromAddressFamily
215 netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET),
216 constants.IP4_VERSION)
218 netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET6),
219 constants.IP6_VERSION)
220 self.assertRaises(errors.ProgrammerError,
221 netutils.IPAddress.GetVersionFromAddressFamily, socket.AF_UNIX)
224 class TestIP4Address(unittest.TestCase):
225 def testGetIPIntFromString(self):
226 fn = netutils.IP4Address._GetIPIntFromString
227 self.assertEqual(fn("0.0.0.0"), 0)
228 self.assertEqual(fn("0.0.0.1"), 1)
229 self.assertEqual(fn("127.0.0.1"), 2130706433)
230 self.assertEqual(fn("192.0.2.129"), 3221226113)
231 self.assertEqual(fn("255.255.255.255"), 2**32 - 1)
232 self.assertNotEqual(fn("0.0.0.0"), 1)
233 self.assertNotEqual(fn("0.0.0.0"), 1)
235 def testIsValid(self):
236 self.assert_(netutils.IP4Address.IsValid("0.0.0.0"))
237 self.assert_(netutils.IP4Address.IsValid("127.0.0.1"))
238 self.assert_(netutils.IP4Address.IsValid("192.0.2.199"))
239 self.assert_(netutils.IP4Address.IsValid("255.255.255.255"))
241 def testNotIsValid(self):
242 self.assertFalse(netutils.IP4Address.IsValid("0"))
243 self.assertFalse(netutils.IP4Address.IsValid("1"))
244 self.assertFalse(netutils.IP4Address.IsValid("1.1.1"))
245 self.assertFalse(netutils.IP4Address.IsValid("255.255.255.256"))
246 self.assertFalse(netutils.IP4Address.IsValid("::1"))
248 def testInNetwork(self):
249 self.assert_(netutils.IP4Address.InNetwork("127.0.0.0/8", "127.0.0.1"))
251 def testNotInNetwork(self):
252 self.assertFalse(netutils.IP4Address.InNetwork("192.0.2.0/24",
255 def testIsLoopback(self):
256 self.assert_(netutils.IP4Address.IsLoopback("127.0.0.1"))
258 def testNotIsLoopback(self):
259 self.assertFalse(netutils.IP4Address.IsLoopback("192.0.2.1"))
262 class TestIP6Address(unittest.TestCase):
263 def testGetIPIntFromString(self):
264 fn = netutils.IP6Address._GetIPIntFromString
265 self.assertEqual(fn("::"), 0)
266 self.assertEqual(fn("::1"), 1)
267 self.assertEqual(fn("2001:db8::1"),
268 42540766411282592856903984951653826561L)
269 self.assertEqual(fn("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), 2**128-1)
270 self.assertNotEqual(netutils.IP6Address._GetIPIntFromString("::2"), 1)
272 def testIsValid(self):
273 self.assert_(netutils.IP6Address.IsValid("::"))
274 self.assert_(netutils.IP6Address.IsValid("::1"))
275 self.assert_(netutils.IP6Address.IsValid("1" + (":1" * 7)))
276 self.assert_(netutils.IP6Address.IsValid("ffff" + (":ffff" * 7)))
277 self.assert_(netutils.IP6Address.IsValid("::"))
279 def testNotIsValid(self):
280 self.assertFalse(netutils.IP6Address.IsValid("0"))
281 self.assertFalse(netutils.IP6Address.IsValid(":1"))
282 self.assertFalse(netutils.IP6Address.IsValid("f" + (":f" * 6)))
283 self.assertFalse(netutils.IP6Address.IsValid("fffg" + (":ffff" * 7)))
284 self.assertFalse(netutils.IP6Address.IsValid("fffff" + (":ffff" * 7)))
285 self.assertFalse(netutils.IP6Address.IsValid("1" + (":1" * 8)))
286 self.assertFalse(netutils.IP6Address.IsValid("127.0.0.1"))
288 def testInNetwork(self):
289 self.assert_(netutils.IP6Address.InNetwork("::1/128", "::1"))
291 def testNotInNetwork(self):
292 self.assertFalse(netutils.IP6Address.InNetwork("2001:db8::1/128", "::1"))
294 def testIsLoopback(self):
295 self.assert_(netutils.IP6Address.IsLoopback("::1"))
297 def testNotIsLoopback(self):
298 self.assertFalse(netutils.IP6Address.IsLoopback("2001:db8::1"))
301 class _BaseTcpPingTest:
302 """Base class for TcpPing tests against listen(2)ing port"""
307 self.listener = socket.socket(self.family, socket.SOCK_STREAM)
308 self.listener.bind((self.address, 0))
309 self.listenerport = self.listener.getsockname()[1]
310 self.listener.listen(1)
313 self.listener.shutdown(socket.SHUT_RDWR)
315 del self.listenerport
317 def testTcpPingToLocalHostAccept(self):
318 self.assert_(netutils.TcpPing(self.address,
320 timeout=constants.TCP_PING_TIMEOUT,
321 live_port_needed=True,
324 "failed to connect to test listener")
326 self.assert_(netutils.TcpPing(self.address, self.listenerport,
327 timeout=constants.TCP_PING_TIMEOUT,
328 live_port_needed=True),
329 "failed to connect to test listener (no source)")
332 class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
333 """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
334 family = socket.AF_INET
335 address = constants.IP4_ADDRESS_LOCALHOST
338 unittest.TestCase.setUp(self)
339 _BaseTcpPingTest.setUp(self)
342 unittest.TestCase.tearDown(self)
343 _BaseTcpPingTest.tearDown(self)
346 class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
347 """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
348 family = socket.AF_INET6
349 address = constants.IP6_ADDRESS_LOCALHOST
352 unittest.TestCase.setUp(self)
353 _BaseTcpPingTest.setUp(self)
356 unittest.TestCase.tearDown(self)
357 _BaseTcpPingTest.tearDown(self)
360 class _BaseTcpPingDeafTest:
361 """Base class for TcpPing tests against non listen(2)ing port"""
366 self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
367 self.deaflistener.bind((self.address, 0))
368 self.deaflistenerport = self.deaflistener.getsockname()[1]
371 del self.deaflistener
372 del self.deaflistenerport
374 def testTcpPingToLocalHostAcceptDeaf(self):
375 self.assertFalse(netutils.TcpPing(self.address,
376 self.deaflistenerport,
377 timeout=constants.TCP_PING_TIMEOUT,
378 live_port_needed=True,
380 ), # need successful connect(2)
381 "successfully connected to deaf listener")
383 self.assertFalse(netutils.TcpPing(self.address,
384 self.deaflistenerport,
385 timeout=constants.TCP_PING_TIMEOUT,
386 live_port_needed=True,
387 ), # need successful connect(2)
388 "successfully connected to deaf listener (no source)")
390 def testTcpPingToLocalHostNoAccept(self):
391 self.assert_(netutils.TcpPing(self.address,
392 self.deaflistenerport,
393 timeout=constants.TCP_PING_TIMEOUT,
394 live_port_needed=False,
396 ), # ECONNREFUSED is OK
397 "failed to ping alive host on deaf port")
399 self.assert_(netutils.TcpPing(self.address,
400 self.deaflistenerport,
401 timeout=constants.TCP_PING_TIMEOUT,
402 live_port_needed=False,
403 ), # ECONNREFUSED is OK
404 "failed to ping alive host on deaf port (no source)")
407 class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
408 """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
409 family = socket.AF_INET
410 address = constants.IP4_ADDRESS_LOCALHOST
413 self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
414 self.deaflistener.bind((self.address, 0))
415 self.deaflistenerport = self.deaflistener.getsockname()[1]
418 del self.deaflistener
419 del self.deaflistenerport
422 class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
423 """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
424 family = socket.AF_INET6
425 address = constants.IP6_ADDRESS_LOCALHOST
428 unittest.TestCase.setUp(self)
429 _BaseTcpPingDeafTest.setUp(self)
432 unittest.TestCase.tearDown(self)
433 _BaseTcpPingDeafTest.tearDown(self)
436 class TestFormatAddress(unittest.TestCase):
437 """Testcase for FormatAddress"""
439 def testFormatAddressUnixSocket(self):
440 res1 = netutils.FormatAddress(("12352", 0, 0), family=socket.AF_UNIX)
441 self.assertEqual(res1, "pid=12352, uid=0, gid=0")
443 def testFormatAddressIP4(self):
444 res1 = netutils.FormatAddress(("127.0.0.1", 1234), family=socket.AF_INET)
445 self.assertEqual(res1, "127.0.0.1:1234")
446 res2 = netutils.FormatAddress(("192.0.2.32", None), family=socket.AF_INET)
447 self.assertEqual(res2, "192.0.2.32")
449 def testFormatAddressIP6(self):
450 res1 = netutils.FormatAddress(("::1", 1234), family=socket.AF_INET6)
451 self.assertEqual(res1, "[::1]:1234")
452 res2 = netutils.FormatAddress(("::1", None), family=socket.AF_INET6)
453 self.assertEqual(res2, "[::1]")
454 res2 = netutils.FormatAddress(("2001:db8::beef", "80"),
455 family=socket.AF_INET6)
456 self.assertEqual(res2, "[2001:db8::beef]:80")
458 def testFormatAddressWithoutFamily(self):
459 res1 = netutils.FormatAddress(("127.0.0.1", 1234))
460 self.assertEqual(res1, "127.0.0.1:1234")
461 res2 = netutils.FormatAddress(("::1", 1234))
462 self.assertEqual(res2, "[::1]:1234")
465 def testInvalidFormatAddress(self):
466 self.assertRaises(errors.ParameterError, netutils.FormatAddress,
468 self.assertRaises(errors.ParameterError, netutils.FormatAddress,
469 "127.0.0.1", family=socket.AF_INET)
470 self.assertRaises(errors.ParameterError, netutils.FormatAddress,
471 ("::1"), family=socket.AF_INET )
473 class TestIpParsing(testutils.GanetiTestCase):
474 """Test the code that parses the ip command output"""
477 valid_addresses = [constants.IP4_ADDRESS_ANY,
478 constants.IP4_ADDRESS_LOCALHOST,
479 "192.0.2.1", # RFC5737, IPv4 address blocks for docs
483 for addr in valid_addresses:
484 self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
487 valid_addresses = [constants.IP6_ADDRESS_ANY,
488 constants.IP6_ADDRESS_LOCALHOST,
489 "0:0:0:0:0:0:0:1", # other form for IP6_ADDRESS_LOCALHOST
490 "0:0:0:0:0:0:0:0", # other form for IP6_ADDRESS_ANY
491 "2001:db8:85a3::8a2e:370:7334", # RFC3849 IP6 docs block
492 "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
493 "0:0:0:0:0:FFFF:192.0.2.1", # IPv4-compatible IPv6
495 "0:0:0:0:0:0:203.0.113.1", # IPv4-mapped IPv6
498 for addr in valid_addresses:
499 self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
501 def testParseIpCommandOutput(self):
502 # IPv4-only, fake loopback interface
503 tests = ["ip-addr-show-lo-ipv4.txt", "ip-addr-show-lo-oneline-ipv4.txt"]
504 for test_file in tests:
505 data = testutils.ReadTestData(test_file)
506 addr = netutils._GetIpAddressesFromIpOutput(data)
507 self.failUnless(len(addr[4]) == 1 and addr[4][0] == "127.0.0.1" and not
510 # IPv6-only, fake loopback interface
511 tests = ["ip-addr-show-lo-ipv6.txt", "ip-addr-show-lo-ipv6.txt"]
512 for test_file in tests:
513 data = testutils.ReadTestData(test_file)
514 addr = netutils._GetIpAddressesFromIpOutput(data)
515 self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and not addr[4])
517 # IPv4 and IPv6, fake loopback interface
518 tests = ["ip-addr-show-lo.txt", "ip-addr-show-lo-oneline.txt"]
519 for test_file in tests:
520 data = testutils.ReadTestData(test_file)
521 addr = netutils._GetIpAddressesFromIpOutput(data)
522 self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and
523 len(addr[4]) == 1 and addr[4][0] == "127.0.0.1")
525 # IPv4 and IPv6, dummy interface
526 data = testutils.ReadTestData("ip-addr-show-dummy0.txt")
527 addr = netutils._GetIpAddressesFromIpOutput(data)
528 self.failUnless(len(addr[6]) == 1 and
529 addr[6][0] == "2001:db8:85a3::8a2e:370:7334" and
530 len(addr[4]) == 1 and
531 addr[4][0] == "192.0.2.1")
534 if __name__ == "__main__":
535 testutils.GanetiTestProgram()