4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the netutils module"""
32 from ganeti import constants
33 from ganeti import errors
34 from ganeti import netutils
35 from ganeti import serializer
36 from ganeti import utils
39 def _GetSocketCredentials(path):
40 """Connect to a Unix socket and return remote credentials.
43 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
47 return netutils.GetSocketCredentials(sock)
52 class TestGetSocketCredentials(unittest.TestCase):
54 self.tmpdir = tempfile.mkdtemp()
55 self.sockpath = utils.PathJoin(self.tmpdir, "sock")
57 self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
58 self.listener.settimeout(10)
59 self.listener.bind(self.sockpath)
60 self.listener.listen(1)
63 self.listener.shutdown(socket.SHUT_RDWR)
65 shutil.rmtree(self.tmpdir)
68 (c2pr, c2pw) = os.pipe()
74 data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
85 # Wait for one connection
86 (conn, _) = self.listener.accept()
91 result = os.read(c2pr, 4096)
94 # Check child's exit code
95 (_, status) = os.waitpid(child, 0)
96 self.assertFalse(os.WIFSIGNALED(status))
97 self.assertEqual(os.WEXITSTATUS(status), 0)
100 (pid, uid, gid) = serializer.LoadJson(result)
101 self.assertEqual(pid, os.getpid())
102 self.assertEqual(uid, os.getuid())
103 self.assertEqual(gid, os.getgid())
106 class TestHostname(unittest.TestCase):
107 """Testing case for Hostname"""
109 def testUppercase(self):
110 data = "AbC.example.com"
111 self.assertEqual(netutils.Hostname.GetNormalizedName(data), data.lower())
113 def testTooLongName(self):
114 data = "a.b." + "c" * 255
115 self.assertRaises(errors.OpPrereqError,
116 netutils.Hostname.GetNormalizedName, data)
118 def testTrailingDot(self):
120 self.assertEqual(netutils.Hostname.GetNormalizedName(data + "."), data)
122 def testInvalidName(self):
130 self.assertRaises(errors.OpPrereqError,
131 netutils.Hostname.GetNormalizedName, value)
133 def testValidName(self):
141 self.assertEqual(netutils.Hostname.GetNormalizedName(value), value)
144 class TestIPAddress(unittest.TestCase):
145 def testIsValid(self):
146 self.assert_(netutils.IPAddress.IsValid("0.0.0.0"))
147 self.assert_(netutils.IPAddress.IsValid("127.0.0.1"))
148 self.assert_(netutils.IPAddress.IsValid("::"))
149 self.assert_(netutils.IPAddress.IsValid("::1"))
151 def testNotIsValid(self):
152 self.assertFalse(netutils.IPAddress.IsValid("0"))
153 self.assertFalse(netutils.IPAddress.IsValid("1.1.1.256"))
154 self.assertFalse(netutils.IPAddress.IsValid("a:g::1"))
156 def testGetAddressFamily(self):
157 fn = netutils.IPAddress.GetAddressFamily
158 self.assertEqual(fn("127.0.0.1"), socket.AF_INET)
159 self.assertEqual(fn("10.2.0.127"), socket.AF_INET)
160 self.assertEqual(fn("::1"), socket.AF_INET6)
161 self.assertEqual(fn("2001:db8::1"), socket.AF_INET6)
162 self.assertRaises(errors.IPAddressError, fn, "0")
164 def testOwnLoopback(self):
165 # FIXME: In a pure IPv6 environment this is no longer true
166 self.assert_(netutils.IPAddress.Own("127.0.0.1"),
167 "Should own 127.0.0.1 address")
169 def testNotOwnAddress(self):
170 self.assertFalse(netutils.IPAddress.Own("2001:db8::1"),
171 "Should not own IP address 2001:db8::1")
172 self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
173 "Should not own IP address 192.0.2.1")
175 def testFamilyVersionConversions(self):
176 # IPAddress.GetAddressFamilyFromVersion
178 netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP4_VERSION),
181 netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP6_VERSION),
183 self.assertRaises(errors.ProgrammerError,
184 netutils.IPAddress.GetAddressFamilyFromVersion, 3)
186 # IPAddress.GetVersionFromAddressFamily
188 netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET),
189 constants.IP4_VERSION)
191 netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET6),
192 constants.IP6_VERSION)
193 self.assertRaises(errors.ProgrammerError,
194 netutils.IPAddress.GetVersionFromAddressFamily, socket.AF_UNIX)
197 class TestIP4Address(unittest.TestCase):
198 def testGetIPIntFromString(self):
199 fn = netutils.IP4Address._GetIPIntFromString
200 self.assertEqual(fn("0.0.0.0"), 0)
201 self.assertEqual(fn("0.0.0.1"), 1)
202 self.assertEqual(fn("127.0.0.1"), 2130706433)
203 self.assertEqual(fn("192.0.2.129"), 3221226113)
204 self.assertEqual(fn("255.255.255.255"), 2**32 - 1)
205 self.assertNotEqual(fn("0.0.0.0"), 1)
206 self.assertNotEqual(fn("0.0.0.0"), 1)
208 def testIsValid(self):
209 self.assert_(netutils.IP4Address.IsValid("0.0.0.0"))
210 self.assert_(netutils.IP4Address.IsValid("127.0.0.1"))
211 self.assert_(netutils.IP4Address.IsValid("192.0.2.199"))
212 self.assert_(netutils.IP4Address.IsValid("255.255.255.255"))
214 def testNotIsValid(self):
215 self.assertFalse(netutils.IP4Address.IsValid("0"))
216 self.assertFalse(netutils.IP4Address.IsValid("1"))
217 self.assertFalse(netutils.IP4Address.IsValid("1.1.1"))
218 self.assertFalse(netutils.IP4Address.IsValid("255.255.255.256"))
219 self.assertFalse(netutils.IP4Address.IsValid("::1"))
221 def testInNetwork(self):
222 self.assert_(netutils.IP4Address.InNetwork("127.0.0.0/8", "127.0.0.1"))
224 def testNotInNetwork(self):
225 self.assertFalse(netutils.IP4Address.InNetwork("192.0.2.0/24",
228 def testIsLoopback(self):
229 self.assert_(netutils.IP4Address.IsLoopback("127.0.0.1"))
231 def testNotIsLoopback(self):
232 self.assertFalse(netutils.IP4Address.IsLoopback("192.0.2.1"))
235 class TestIP6Address(unittest.TestCase):
236 def testGetIPIntFromString(self):
237 fn = netutils.IP6Address._GetIPIntFromString
238 self.assertEqual(fn("::"), 0)
239 self.assertEqual(fn("::1"), 1)
240 self.assertEqual(fn("2001:db8::1"),
241 42540766411282592856903984951653826561L)
242 self.assertEqual(fn("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), 2**128-1)
243 self.assertNotEqual(netutils.IP6Address._GetIPIntFromString("::2"), 1)
245 def testIsValid(self):
246 self.assert_(netutils.IP6Address.IsValid("::"))
247 self.assert_(netutils.IP6Address.IsValid("::1"))
248 self.assert_(netutils.IP6Address.IsValid("1" + (":1" * 7)))
249 self.assert_(netutils.IP6Address.IsValid("ffff" + (":ffff" * 7)))
250 self.assert_(netutils.IP6Address.IsValid("::"))
252 def testNotIsValid(self):
253 self.assertFalse(netutils.IP6Address.IsValid("0"))
254 self.assertFalse(netutils.IP6Address.IsValid(":1"))
255 self.assertFalse(netutils.IP6Address.IsValid("f" + (":f" * 6)))
256 self.assertFalse(netutils.IP6Address.IsValid("fffg" + (":ffff" * 7)))
257 self.assertFalse(netutils.IP6Address.IsValid("fffff" + (":ffff" * 7)))
258 self.assertFalse(netutils.IP6Address.IsValid("1" + (":1" * 8)))
259 self.assertFalse(netutils.IP6Address.IsValid("127.0.0.1"))
261 def testInNetwork(self):
262 self.assert_(netutils.IP6Address.InNetwork("::1/128", "::1"))
264 def testNotInNetwork(self):
265 self.assertFalse(netutils.IP6Address.InNetwork("2001:db8::1/128", "::1"))
267 def testIsLoopback(self):
268 self.assert_(netutils.IP6Address.IsLoopback("::1"))
270 def testNotIsLoopback(self):
271 self.assertFalse(netutils.IP6Address.IsLoopback("2001:db8::1"))
274 class _BaseTcpPingTest:
275 """Base class for TcpPing tests against listen(2)ing port"""
280 self.listener = socket.socket(self.family, socket.SOCK_STREAM)
281 self.listener.bind((self.address, 0))
282 self.listenerport = self.listener.getsockname()[1]
283 self.listener.listen(1)
286 self.listener.shutdown(socket.SHUT_RDWR)
288 del self.listenerport
290 def testTcpPingToLocalHostAccept(self):
291 self.assert_(netutils.TcpPing(self.address,
293 timeout=constants.TCP_PING_TIMEOUT,
294 live_port_needed=True,
297 "failed to connect to test listener")
299 self.assert_(netutils.TcpPing(self.address, self.listenerport,
300 timeout=constants.TCP_PING_TIMEOUT,
301 live_port_needed=True),
302 "failed to connect to test listener (no source)")
305 class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
306 """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
307 family = socket.AF_INET
308 address = constants.IP4_ADDRESS_LOCALHOST
311 unittest.TestCase.setUp(self)
312 _BaseTcpPingTest.setUp(self)
315 unittest.TestCase.tearDown(self)
316 _BaseTcpPingTest.tearDown(self)
319 class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
320 """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
321 family = socket.AF_INET6
322 address = constants.IP6_ADDRESS_LOCALHOST
325 unittest.TestCase.setUp(self)
326 _BaseTcpPingTest.setUp(self)
329 unittest.TestCase.tearDown(self)
330 _BaseTcpPingTest.tearDown(self)
333 class _BaseTcpPingDeafTest:
334 """Base class for TcpPing tests against non listen(2)ing port"""
339 self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
340 self.deaflistener.bind((self.address, 0))
341 self.deaflistenerport = self.deaflistener.getsockname()[1]
344 del self.deaflistener
345 del self.deaflistenerport
347 def testTcpPingToLocalHostAcceptDeaf(self):
348 self.assertFalse(netutils.TcpPing(self.address,
349 self.deaflistenerport,
350 timeout=constants.TCP_PING_TIMEOUT,
351 live_port_needed=True,
353 ), # need successful connect(2)
354 "successfully connected to deaf listener")
356 self.assertFalse(netutils.TcpPing(self.address,
357 self.deaflistenerport,
358 timeout=constants.TCP_PING_TIMEOUT,
359 live_port_needed=True,
360 ), # need successful connect(2)
361 "successfully connected to deaf listener (no source)")
363 def testTcpPingToLocalHostNoAccept(self):
364 self.assert_(netutils.TcpPing(self.address,
365 self.deaflistenerport,
366 timeout=constants.TCP_PING_TIMEOUT,
367 live_port_needed=False,
369 ), # ECONNREFUSED is OK
370 "failed to ping alive host on deaf port")
372 self.assert_(netutils.TcpPing(self.address,
373 self.deaflistenerport,
374 timeout=constants.TCP_PING_TIMEOUT,
375 live_port_needed=False,
376 ), # ECONNREFUSED is OK
377 "failed to ping alive host on deaf port (no source)")
380 class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
381 """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
382 family = socket.AF_INET
383 address = constants.IP4_ADDRESS_LOCALHOST
386 self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
387 self.deaflistener.bind((self.address, 0))
388 self.deaflistenerport = self.deaflistener.getsockname()[1]
391 del self.deaflistener
392 del self.deaflistenerport
395 class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
396 """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
397 family = socket.AF_INET6
398 address = constants.IP6_ADDRESS_LOCALHOST
401 unittest.TestCase.setUp(self)
402 _BaseTcpPingDeafTest.setUp(self)
405 unittest.TestCase.tearDown(self)
406 _BaseTcpPingDeafTest.tearDown(self)
409 class TestFormatAddress(unittest.TestCase):
410 """Testcase for FormatAddress"""
412 def testFormatAddressUnixSocket(self):
413 res1 = netutils.FormatAddress(("12352", 0, 0), family=socket.AF_UNIX)
414 self.assertEqual(res1, "pid=12352, uid=0, gid=0")
416 def testFormatAddressIP4(self):
417 res1 = netutils.FormatAddress(("127.0.0.1", 1234), family=socket.AF_INET)
418 self.assertEqual(res1, "127.0.0.1:1234")
419 res2 = netutils.FormatAddress(("192.0.2.32", None), family=socket.AF_INET)
420 self.assertEqual(res2, "192.0.2.32")
422 def testFormatAddressIP6(self):
423 res1 = netutils.FormatAddress(("::1", 1234), family=socket.AF_INET6)
424 self.assertEqual(res1, "[::1]:1234")
425 res2 = netutils.FormatAddress(("::1", None), family=socket.AF_INET6)
426 self.assertEqual(res2, "[::1]")
427 res2 = netutils.FormatAddress(("2001:db8::beef", "80"),
428 family=socket.AF_INET6)
429 self.assertEqual(res2, "[2001:db8::beef]:80")
431 def testFormatAddressWithoutFamily(self):
432 res1 = netutils.FormatAddress(("127.0.0.1", 1234))
433 self.assertEqual(res1, "127.0.0.1:1234")
434 res2 = netutils.FormatAddress(("::1", 1234))
435 self.assertEqual(res2, "[::1]:1234")
438 def testInvalidFormatAddress(self):
439 self.assertRaises(errors.ParameterError, netutils.FormatAddress,
441 self.assertRaises(errors.ParameterError, netutils.FormatAddress,
442 "127.0.0.1", family=socket.AF_INET)
443 self.assertRaises(errors.ParameterError, netutils.FormatAddress,
444 ("::1"), family=socket.AF_INET )
446 class TestIpParsing(testutils.GanetiTestCase):
447 """Test the code that parses the ip command output"""
450 valid_addresses = [constants.IP4_ADDRESS_ANY,
451 constants.IP4_ADDRESS_LOCALHOST,
452 "192.0.2.1", # RFC5737, IPv4 address blocks for docs
456 for addr in valid_addresses:
457 self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
460 valid_addresses = [constants.IP6_ADDRESS_ANY,
461 constants.IP6_ADDRESS_LOCALHOST,
462 "0:0:0:0:0:0:0:1", # other form for IP6_ADDRESS_LOCALHOST
463 "0:0:0:0:0:0:0:0", # other form for IP6_ADDRESS_ANY
464 "2001:db8:85a3::8a2e:370:7334", # RFC3849 IP6 docs block
465 "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
466 "0:0:0:0:0:FFFF:192.0.2.1", # IPv4-compatible IPv6
468 "0:0:0:0:0:0:203.0.113.1", # IPv4-mapped IPv6
471 for addr in valid_addresses:
472 self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
474 def testParseIpCommandOutput(self):
475 # IPv4-only, fake loopback interface
476 tests = ["ip-addr-show-lo-ipv4.txt", "ip-addr-show-lo-oneline-ipv4.txt"]
477 for test_file in tests:
478 data = self._ReadTestData(test_file)
479 addr = netutils._GetIpAddressesFromIpOutput(data)
480 self.failUnless(len(addr[4]) == 1 and addr[4][0] == "127.0.0.1" and not
483 # IPv6-only, fake loopback interface
484 tests = ["ip-addr-show-lo-ipv6.txt", "ip-addr-show-lo-ipv6.txt"]
485 for test_file in tests:
486 data = self._ReadTestData(test_file)
487 addr = netutils._GetIpAddressesFromIpOutput(data)
488 self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and not addr[4])
490 # IPv4 and IPv6, fake loopback interface
491 tests = ["ip-addr-show-lo.txt", "ip-addr-show-lo-oneline.txt"]
492 for test_file in tests:
493 data = self._ReadTestData(test_file)
494 addr = netutils._GetIpAddressesFromIpOutput(data)
495 self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and
496 len(addr[4]) == 1 and addr[4][0] == "127.0.0.1")
498 # IPv4 and IPv6, dummy interface
499 data = self._ReadTestData("ip-addr-show-dummy0.txt")
500 addr = netutils._GetIpAddressesFromIpOutput(data)
501 self.failUnless(len(addr[6]) == 1 and
502 addr[6][0] == "2001:db8:85a3::8a2e:370:7334" and
503 len(addr[4]) == 1 and
504 addr[4][0] == "192.0.2.1")
507 if __name__ == "__main__":
508 testutils.GanetiTestProgram()