4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the netutils module"""
31 from ganeti import constants
32 from ganeti import errors
33 from ganeti import netutils
34 from ganeti import serializer
35 from ganeti import utils
38 def _GetSocketCredentials(path):
39 """Connect to a Unix socket and return remote credentials.
42 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
46 return netutils.GetSocketCredentials(sock)
51 class TestGetSocketCredentials(unittest.TestCase):
53 self.tmpdir = tempfile.mkdtemp()
54 self.sockpath = utils.PathJoin(self.tmpdir, "sock")
56 self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
57 self.listener.settimeout(10)
58 self.listener.bind(self.sockpath)
59 self.listener.listen(1)
62 self.listener.shutdown(socket.SHUT_RDWR)
64 shutil.rmtree(self.tmpdir)
67 (c2pr, c2pw) = os.pipe()
73 data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
84 # Wait for one connection
85 (conn, _) = self.listener.accept()
90 result = os.read(c2pr, 4096)
93 # Check child's exit code
94 (_, status) = os.waitpid(child, 0)
95 self.assertFalse(os.WIFSIGNALED(status))
96 self.assertEqual(os.WEXITSTATUS(status), 0)
99 (pid, uid, gid) = serializer.LoadJson(result)
100 self.assertEqual(pid, os.getpid())
101 self.assertEqual(uid, os.getuid())
102 self.assertEqual(gid, os.getgid())
105 class TestHostname(unittest.TestCase):
106 """Testing case for Hostname"""
108 def testUppercase(self):
109 data = "AbC.example.com"
110 self.assertEqual(netutils.Hostname.GetNormalizedName(data), data.lower())
112 def testTooLongName(self):
113 data = "a.b." + "c" * 255
114 self.assertRaises(errors.OpPrereqError,
115 netutils.Hostname.GetNormalizedName, data)
117 def testTrailingDot(self):
119 self.assertEqual(netutils.Hostname.GetNormalizedName(data + "."), data)
121 def testInvalidName(self):
129 self.assertRaises(errors.OpPrereqError,
130 netutils.Hostname.GetNormalizedName, value)
132 def testValidName(self):
140 self.assertEqual(netutils.Hostname.GetNormalizedName(value), value)
143 class TestIPAddress(unittest.TestCase):
144 def testIsValid(self):
145 self.assert_(netutils.IPAddress.IsValid("0.0.0.0"))
146 self.assert_(netutils.IPAddress.IsValid("127.0.0.1"))
147 self.assert_(netutils.IPAddress.IsValid("::"))
148 self.assert_(netutils.IPAddress.IsValid("::1"))
150 def testNotIsValid(self):
151 self.assertFalse(netutils.IPAddress.IsValid("0"))
152 self.assertFalse(netutils.IPAddress.IsValid("1.1.1.256"))
153 self.assertFalse(netutils.IPAddress.IsValid("a:g::1"))
156 def testGetAddressFamily(self):
157 fn = netutils.IPAddress.GetAddressFamily
158 self.assertEqual(fn("127.0.0.1"), socket.AF_INET)
159 self.assertEqual(fn("10.2.0.127"), socket.AF_INET)
160 self.assertEqual(fn("::1"), socket.AF_INET6)
161 self.assertEqual(fn("2001:db8::1"), socket.AF_INET6)
162 self.assertRaises(errors.IPAddressError, fn, "0")
164 def testOwnLoopback(self):
165 # FIXME: In a pure IPv6 environment this is no longer true
166 self.assert_(netutils.IPAddress.Own("127.0.0.1"),
167 "Should own 127.0.0.1 address")
169 def testNotOwnAddress(self):
170 self.assertFalse(netutils.IPAddress.Own("2001:db8::1"),
171 "Should not own IP address 2001:db8::1")
172 self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
173 "Should not own IP address 192.0.2.1")
176 class TestIP4Address(unittest.TestCase):
177 def testGetIPIntFromString(self):
178 fn = netutils.IP4Address._GetIPIntFromString
179 self.assertEqual(fn("0.0.0.0"), 0)
180 self.assertEqual(fn("0.0.0.1"), 1)
181 self.assertEqual(fn("127.0.0.1"), 2130706433)
182 self.assertEqual(fn("192.0.2.129"), 3221226113)
183 self.assertEqual(fn("255.255.255.255"), 2**32 - 1)
184 self.assertNotEqual(fn("0.0.0.0"), 1)
185 self.assertNotEqual(fn("0.0.0.0"), 1)
187 def testIsValid(self):
188 self.assert_(netutils.IP4Address.IsValid("0.0.0.0"))
189 self.assert_(netutils.IP4Address.IsValid("127.0.0.1"))
190 self.assert_(netutils.IP4Address.IsValid("192.0.2.199"))
191 self.assert_(netutils.IP4Address.IsValid("255.255.255.255"))
193 def testNotIsValid(self):
194 self.assertFalse(netutils.IP4Address.IsValid("0"))
195 self.assertFalse(netutils.IP4Address.IsValid("1"))
196 self.assertFalse(netutils.IP4Address.IsValid("1.1.1"))
197 self.assertFalse(netutils.IP4Address.IsValid("255.255.255.256"))
198 self.assertFalse(netutils.IP4Address.IsValid("::1"))
200 def testInNetwork(self):
201 self.assert_(netutils.IP4Address.InNetwork("127.0.0.0/8", "127.0.0.1"))
203 def testNotInNetwork(self):
204 self.assertFalse(netutils.IP4Address.InNetwork("192.0.2.0/24",
207 def testIsLoopback(self):
208 self.assert_(netutils.IP4Address.IsLoopback("127.0.0.1"))
210 def testNotIsLoopback(self):
211 self.assertFalse(netutils.IP4Address.IsLoopback("192.0.2.1"))
214 class TestIP6Address(unittest.TestCase):
215 def testGetIPIntFromString(self):
216 fn = netutils.IP6Address._GetIPIntFromString
217 self.assertEqual(fn("::"), 0)
218 self.assertEqual(fn("::1"), 1)
219 self.assertEqual(fn("2001:db8::1"),
220 42540766411282592856903984951653826561L)
221 self.assertEqual(fn("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), 2**128-1)
222 self.assertNotEqual(netutils.IP6Address._GetIPIntFromString("::2"), 1)
224 def testIsValid(self):
225 self.assert_(netutils.IP6Address.IsValid("::"))
226 self.assert_(netutils.IP6Address.IsValid("::1"))
227 self.assert_(netutils.IP6Address.IsValid("1" + (":1" * 7)))
228 self.assert_(netutils.IP6Address.IsValid("ffff" + (":ffff" * 7)))
229 self.assert_(netutils.IP6Address.IsValid("::"))
231 def testNotIsValid(self):
232 self.assertFalse(netutils.IP6Address.IsValid("0"))
233 self.assertFalse(netutils.IP6Address.IsValid(":1"))
234 self.assertFalse(netutils.IP6Address.IsValid("f" + (":f" * 6)))
235 self.assertFalse(netutils.IP6Address.IsValid("fffg" + (":ffff" * 7)))
236 self.assertFalse(netutils.IP6Address.IsValid("fffff" + (":ffff" * 7)))
237 self.assertFalse(netutils.IP6Address.IsValid("1" + (":1" * 8)))
238 self.assertFalse(netutils.IP6Address.IsValid("127.0.0.1"))
240 def testInNetwork(self):
241 self.assert_(netutils.IP6Address.InNetwork("::1/128", "::1"))
243 def testNotInNetwork(self):
244 self.assertFalse(netutils.IP6Address.InNetwork("2001:db8::1/128", "::1"))
246 def testIsLoopback(self):
247 self.assert_(netutils.IP6Address.IsLoopback("::1"))
249 def testNotIsLoopback(self):
250 self.assertFalse(netutils.IP6Address.IsLoopback("2001:db8::1"))
253 class _BaseTcpPingTest:
254 """Base class for TcpPing tests against listen(2)ing port"""
259 self.listener = socket.socket(self.family, socket.SOCK_STREAM)
260 self.listener.bind((self.address, 0))
261 self.listenerport = self.listener.getsockname()[1]
262 self.listener.listen(1)
265 self.listener.shutdown(socket.SHUT_RDWR)
267 del self.listenerport
269 def testTcpPingToLocalHostAccept(self):
270 self.assert_(netutils.TcpPing(self.address,
272 timeout=constants.TCP_PING_TIMEOUT,
273 live_port_needed=True,
276 "failed to connect to test listener")
278 self.assert_(netutils.TcpPing(self.address, self.listenerport,
279 timeout=constants.TCP_PING_TIMEOUT,
280 live_port_needed=True),
281 "failed to connect to test listener (no source)")
284 class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
285 """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
286 family = socket.AF_INET
287 address = constants.IP4_ADDRESS_LOCALHOST
290 unittest.TestCase.setUp(self)
291 _BaseTcpPingTest.setUp(self)
294 unittest.TestCase.tearDown(self)
295 _BaseTcpPingTest.tearDown(self)
298 class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
299 """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
300 family = socket.AF_INET6
301 address = constants.IP6_ADDRESS_LOCALHOST
304 unittest.TestCase.setUp(self)
305 _BaseTcpPingTest.setUp(self)
308 unittest.TestCase.tearDown(self)
309 _BaseTcpPingTest.tearDown(self)
312 class _BaseTcpPingDeafTest:
313 """Base class for TcpPing tests against non listen(2)ing port"""
318 self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
319 self.deaflistener.bind((self.address, 0))
320 self.deaflistenerport = self.deaflistener.getsockname()[1]
323 del self.deaflistener
324 del self.deaflistenerport
326 def testTcpPingToLocalHostAcceptDeaf(self):
327 self.assertFalse(netutils.TcpPing(self.address,
328 self.deaflistenerport,
329 timeout=constants.TCP_PING_TIMEOUT,
330 live_port_needed=True,
332 ), # need successful connect(2)
333 "successfully connected to deaf listener")
335 self.assertFalse(netutils.TcpPing(self.address,
336 self.deaflistenerport,
337 timeout=constants.TCP_PING_TIMEOUT,
338 live_port_needed=True,
339 ), # need successful connect(2)
340 "successfully connected to deaf listener (no source)")
342 def testTcpPingToLocalHostNoAccept(self):
343 self.assert_(netutils.TcpPing(self.address,
344 self.deaflistenerport,
345 timeout=constants.TCP_PING_TIMEOUT,
346 live_port_needed=False,
348 ), # ECONNREFUSED is OK
349 "failed to ping alive host on deaf port")
351 self.assert_(netutils.TcpPing(self.address,
352 self.deaflistenerport,
353 timeout=constants.TCP_PING_TIMEOUT,
354 live_port_needed=False,
355 ), # ECONNREFUSED is OK
356 "failed to ping alive host on deaf port (no source)")
359 class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
360 """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
361 family = socket.AF_INET
362 address = constants.IP4_ADDRESS_LOCALHOST
365 self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
366 self.deaflistener.bind((self.address, 0))
367 self.deaflistenerport = self.deaflistener.getsockname()[1]
370 del self.deaflistener
371 del self.deaflistenerport
374 class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
375 """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
376 family = socket.AF_INET6
377 address = constants.IP6_ADDRESS_LOCALHOST
380 unittest.TestCase.setUp(self)
381 _BaseTcpPingDeafTest.setUp(self)
384 unittest.TestCase.tearDown(self)
385 _BaseTcpPingDeafTest.tearDown(self)
388 class TestFormatAddress(unittest.TestCase):
389 """Testcase for FormatAddress"""
391 def testFormatAddressUnixSocket(self):
392 res1 = netutils.FormatAddress(socket.AF_UNIX, ("12352", 0, 0))
393 self.assertEqual(res1, "pid=12352, uid=0, gid=0")
395 def testFormatAddressIP4(self):
396 res1 = netutils.FormatAddress(socket.AF_INET, ("127.0.0.1", 1234))
397 self.assertEqual(res1, "127.0.0.1:1234")
398 res2 = netutils.FormatAddress(socket.AF_INET, ("192.0.2.32", None))
399 self.assertEqual(res2, "192.0.2.32")
401 def testFormatAddressIP6(self):
402 res1 = netutils.FormatAddress(socket.AF_INET6, ("::1", 1234))
403 self.assertEqual(res1, "[::1]:1234")
404 res2 = netutils.FormatAddress(socket.AF_INET6, ("::1", None))
405 self.assertEqual(res2, "[::1]")
406 res2 = netutils.FormatAddress(socket.AF_INET6, ("2001:db8::beef", "80"))
407 self.assertEqual(res2, "[2001:db8::beef]:80")
409 def testInvalidFormatAddress(self):
410 self.assertRaises(errors.ParameterError,
411 netutils.FormatAddress, None, ("::1", None))
412 self.assertRaises(errors.ParameterError,
413 netutils.FormatAddress, socket.AF_INET, "127.0.0.1")
414 self.assertRaises(errors.ParameterError,
415 netutils.FormatAddress, socket.AF_INET, ("::1"))
418 if __name__ == "__main__":
419 testutils.GanetiTestProgram()