Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.netutils_unittest.py @ df5a5730

History | View | Annotate | Download (17.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the netutils module"""
23

    
24
import os
25
import re
26
import shutil
27
import socket
28
import tempfile
29
import unittest
30

    
31
import testutils
32
from ganeti import constants
33
from ganeti import errors
34
from ganeti import netutils
35
from ganeti import serializer
36
from ganeti import utils
37

    
38

    
39
def _GetSocketCredentials(path):
40
  """Connect to a Unix socket and return remote credentials.
41

42
  """
43
  sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
44
  try:
45
    sock.settimeout(10)
46
    sock.connect(path)
47
    return netutils.GetSocketCredentials(sock)
48
  finally:
49
    sock.close()
50

    
51

    
52
class TestGetSocketCredentials(unittest.TestCase):
53
  def setUp(self):
54
    self.tmpdir = tempfile.mkdtemp()
55
    self.sockpath = utils.PathJoin(self.tmpdir, "sock")
56

    
57
    self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
58
    self.listener.settimeout(10)
59
    self.listener.bind(self.sockpath)
60
    self.listener.listen(1)
61

    
62
  def tearDown(self):
63
    self.listener.shutdown(socket.SHUT_RDWR)
64
    self.listener.close()
65
    shutil.rmtree(self.tmpdir)
66

    
67
  def test(self):
68
    (c2pr, c2pw) = os.pipe()
69

    
70
    # Start child process
71
    child = os.fork()
72
    if child == 0:
73
      try:
74
        data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
75

    
76
        os.write(c2pw, data)
77
        os.close(c2pw)
78

    
79
        os._exit(0)
80
      finally:
81
        os._exit(1)
82

    
83
    os.close(c2pw)
84

    
85
    # Wait for one connection
86
    (conn, _) = self.listener.accept()
87
    conn.recv(1)
88
    conn.close()
89

    
90
    # Wait for result
91
    result = os.read(c2pr, 4096)
92
    os.close(c2pr)
93

    
94
    # Check child's exit code
95
    (_, status) = os.waitpid(child, 0)
96
    self.assertFalse(os.WIFSIGNALED(status))
97
    self.assertEqual(os.WEXITSTATUS(status), 0)
98

    
99
    # Check result
100
    (pid, uid, gid) = serializer.LoadJson(result)
101
    self.assertEqual(pid, os.getpid())
102
    self.assertEqual(uid, os.getuid())
103
    self.assertEqual(gid, os.getgid())
104

    
105

    
106
class TestHostname(unittest.TestCase):
107
  """Testing case for Hostname"""
108

    
109
  def testUppercase(self):
110
    data = "AbC.example.com"
111
    self.assertEqual(netutils.Hostname.GetNormalizedName(data), data.lower())
112

    
113
  def testTooLongName(self):
114
    data = "a.b." + "c" * 255
115
    self.assertRaises(errors.OpPrereqError,
116
                      netutils.Hostname.GetNormalizedName, data)
117

    
118
  def testTrailingDot(self):
119
    data = "a.b.c"
120
    self.assertEqual(netutils.Hostname.GetNormalizedName(data + "."), data)
121

    
122
  def testInvalidName(self):
123
    data = [
124
      "a b",
125
      "a/b",
126
      ".a.b",
127
      "a..b",
128
      ]
129
    for value in data:
130
      self.assertRaises(errors.OpPrereqError,
131
                        netutils.Hostname.GetNormalizedName, value)
132

    
133
  def testValidName(self):
134
    data = [
135
      "a.b",
136
      "a-b",
137
      "a_b",
138
      "a.b.c",
139
      ]
140
    for value in data:
141
      self.assertEqual(netutils.Hostname.GetNormalizedName(value), value)
142

    
143

    
144
class TestIPAddress(unittest.TestCase):
145
  def testIsValid(self):
146
    self.assert_(netutils.IPAddress.IsValid("0.0.0.0"))
147
    self.assert_(netutils.IPAddress.IsValid("127.0.0.1"))
148
    self.assert_(netutils.IPAddress.IsValid("::"))
149
    self.assert_(netutils.IPAddress.IsValid("::1"))
150

    
151
  def testNotIsValid(self):
152
    self.assertFalse(netutils.IPAddress.IsValid("0"))
153
    self.assertFalse(netutils.IPAddress.IsValid("1.1.1.256"))
154
    self.assertFalse(netutils.IPAddress.IsValid("a:g::1"))
155

    
156
  def testGetAddressFamily(self):
157
    fn = netutils.IPAddress.GetAddressFamily
158
    self.assertEqual(fn("127.0.0.1"), socket.AF_INET)
159
    self.assertEqual(fn("10.2.0.127"), socket.AF_INET)
160
    self.assertEqual(fn("::1"), socket.AF_INET6)
161
    self.assertEqual(fn("2001:db8::1"), socket.AF_INET6)
162
    self.assertRaises(errors.IPAddressError, fn, "0")
163

    
164
  def testOwnLoopback(self):
165
    # FIXME: In a pure IPv6 environment this is no longer true
166
    self.assert_(netutils.IPAddress.Own("127.0.0.1"),
167
                 "Should own 127.0.0.1 address")
168

    
169
  def testNotOwnAddress(self):
170
    self.assertFalse(netutils.IPAddress.Own("2001:db8::1"),
171
                     "Should not own IP address 2001:db8::1")
172
    self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
173
                     "Should not own IP address 192.0.2.1")
174

    
175
  def testFamilyVersionConversions(self):
176
    # IPAddress.GetAddressFamilyFromVersion
177
    self.assertEqual(
178
        netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP4_VERSION),
179
        socket.AF_INET)
180
    self.assertEqual(
181
        netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP6_VERSION),
182
        socket.AF_INET6)
183
    self.assertRaises(errors.ProgrammerError,
184
        netutils.IPAddress.GetAddressFamilyFromVersion, 3)
185

    
186
    # IPAddress.GetVersionFromAddressFamily
187
    self.assertEqual(
188
        netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET),
189
        constants.IP4_VERSION)
190
    self.assertEqual(
191
        netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET6),
192
        constants.IP6_VERSION)
193
    self.assertRaises(errors.ProgrammerError,
194
        netutils.IPAddress.GetVersionFromAddressFamily, socket.AF_UNIX)
195

    
196

    
197
class TestIP4Address(unittest.TestCase):
198
  def testGetIPIntFromString(self):
199
    fn = netutils.IP4Address._GetIPIntFromString
200
    self.assertEqual(fn("0.0.0.0"), 0)
201
    self.assertEqual(fn("0.0.0.1"), 1)
202
    self.assertEqual(fn("127.0.0.1"), 2130706433)
203
    self.assertEqual(fn("192.0.2.129"), 3221226113)
204
    self.assertEqual(fn("255.255.255.255"), 2**32 - 1)
205
    self.assertNotEqual(fn("0.0.0.0"), 1)
206
    self.assertNotEqual(fn("0.0.0.0"), 1)
207

    
208
  def testIsValid(self):
209
    self.assert_(netutils.IP4Address.IsValid("0.0.0.0"))
210
    self.assert_(netutils.IP4Address.IsValid("127.0.0.1"))
211
    self.assert_(netutils.IP4Address.IsValid("192.0.2.199"))
212
    self.assert_(netutils.IP4Address.IsValid("255.255.255.255"))
213

    
214
  def testNotIsValid(self):
215
    self.assertFalse(netutils.IP4Address.IsValid("0"))
216
    self.assertFalse(netutils.IP4Address.IsValid("1"))
217
    self.assertFalse(netutils.IP4Address.IsValid("1.1.1"))
218
    self.assertFalse(netutils.IP4Address.IsValid("255.255.255.256"))
219
    self.assertFalse(netutils.IP4Address.IsValid("::1"))
220

    
221
  def testInNetwork(self):
222
    self.assert_(netutils.IP4Address.InNetwork("127.0.0.0/8", "127.0.0.1"))
223

    
224
  def testNotInNetwork(self):
225
    self.assertFalse(netutils.IP4Address.InNetwork("192.0.2.0/24",
226
                                                   "127.0.0.1"))
227

    
228
  def testIsLoopback(self):
229
    self.assert_(netutils.IP4Address.IsLoopback("127.0.0.1"))
230

    
231
  def testNotIsLoopback(self):
232
    self.assertFalse(netutils.IP4Address.IsLoopback("192.0.2.1"))
233

    
234

    
235
class TestIP6Address(unittest.TestCase):
236
  def testGetIPIntFromString(self):
237
    fn = netutils.IP6Address._GetIPIntFromString
238
    self.assertEqual(fn("::"), 0)
239
    self.assertEqual(fn("::1"), 1)
240
    self.assertEqual(fn("2001:db8::1"),
241
                     42540766411282592856903984951653826561L)
242
    self.assertEqual(fn("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), 2**128-1)
243
    self.assertNotEqual(netutils.IP6Address._GetIPIntFromString("::2"), 1)
244

    
245
  def testIsValid(self):
246
    self.assert_(netutils.IP6Address.IsValid("::"))
247
    self.assert_(netutils.IP6Address.IsValid("::1"))
248
    self.assert_(netutils.IP6Address.IsValid("1" + (":1" * 7)))
249
    self.assert_(netutils.IP6Address.IsValid("ffff" + (":ffff" * 7)))
250
    self.assert_(netutils.IP6Address.IsValid("::"))
251

    
252
  def testNotIsValid(self):
253
    self.assertFalse(netutils.IP6Address.IsValid("0"))
254
    self.assertFalse(netutils.IP6Address.IsValid(":1"))
255
    self.assertFalse(netutils.IP6Address.IsValid("f" + (":f" * 6)))
256
    self.assertFalse(netutils.IP6Address.IsValid("fffg" + (":ffff" * 7)))
257
    self.assertFalse(netutils.IP6Address.IsValid("fffff" + (":ffff" * 7)))
258
    self.assertFalse(netutils.IP6Address.IsValid("1" + (":1" * 8)))
259
    self.assertFalse(netutils.IP6Address.IsValid("127.0.0.1"))
260

    
261
  def testInNetwork(self):
262
    self.assert_(netutils.IP6Address.InNetwork("::1/128", "::1"))
263

    
264
  def testNotInNetwork(self):
265
    self.assertFalse(netutils.IP6Address.InNetwork("2001:db8::1/128", "::1"))
266

    
267
  def testIsLoopback(self):
268
    self.assert_(netutils.IP6Address.IsLoopback("::1"))
269

    
270
  def testNotIsLoopback(self):
271
    self.assertFalse(netutils.IP6Address.IsLoopback("2001:db8::1"))
272

    
273

    
274
class _BaseTcpPingTest:
275
  """Base class for TcpPing tests against listen(2)ing port"""
276
  family = None
277
  address = None
278

    
279
  def setUp(self):
280
    self.listener = socket.socket(self.family, socket.SOCK_STREAM)
281
    self.listener.bind((self.address, 0))
282
    self.listenerport = self.listener.getsockname()[1]
283
    self.listener.listen(1)
284

    
285
  def tearDown(self):
286
    self.listener.shutdown(socket.SHUT_RDWR)
287
    del self.listener
288
    del self.listenerport
289

    
290
  def testTcpPingToLocalHostAccept(self):
291
    self.assert_(netutils.TcpPing(self.address,
292
                                  self.listenerport,
293
                                  timeout=constants.TCP_PING_TIMEOUT,
294
                                  live_port_needed=True,
295
                                  source=self.address,
296
                                  ),
297
                 "failed to connect to test listener")
298

    
299
    self.assert_(netutils.TcpPing(self.address, self.listenerport,
300
                                  timeout=constants.TCP_PING_TIMEOUT,
301
                                  live_port_needed=True),
302
                 "failed to connect to test listener (no source)")
303

    
304

    
305
class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
306
  """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
307
  family = socket.AF_INET
308
  address = constants.IP4_ADDRESS_LOCALHOST
309

    
310
  def setUp(self):
311
    unittest.TestCase.setUp(self)
312
    _BaseTcpPingTest.setUp(self)
313

    
314
  def tearDown(self):
315
    unittest.TestCase.tearDown(self)
316
    _BaseTcpPingTest.tearDown(self)
317

    
318

    
319
class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
320
  """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
321
  family = socket.AF_INET6
322
  address = constants.IP6_ADDRESS_LOCALHOST
323

    
324
  def setUp(self):
325
    unittest.TestCase.setUp(self)
326
    _BaseTcpPingTest.setUp(self)
327

    
328
  def tearDown(self):
329
    unittest.TestCase.tearDown(self)
330
    _BaseTcpPingTest.tearDown(self)
331

    
332

    
333
class _BaseTcpPingDeafTest:
334
  """Base class for TcpPing tests against non listen(2)ing port"""
335
  family = None
336
  address = None
337

    
338
  def setUp(self):
339
    self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
340
    self.deaflistener.bind((self.address, 0))
341
    self.deaflistenerport = self.deaflistener.getsockname()[1]
342

    
343
  def tearDown(self):
344
    del self.deaflistener
345
    del self.deaflistenerport
346

    
347
  def testTcpPingToLocalHostAcceptDeaf(self):
348
    self.assertFalse(netutils.TcpPing(self.address,
349
                                      self.deaflistenerport,
350
                                      timeout=constants.TCP_PING_TIMEOUT,
351
                                      live_port_needed=True,
352
                                      source=self.address,
353
                                      ), # need successful connect(2)
354
                     "successfully connected to deaf listener")
355

    
356
    self.assertFalse(netutils.TcpPing(self.address,
357
                                      self.deaflistenerport,
358
                                      timeout=constants.TCP_PING_TIMEOUT,
359
                                      live_port_needed=True,
360
                                      ), # need successful connect(2)
361
                     "successfully connected to deaf listener (no source)")
362

    
363
  def testTcpPingToLocalHostNoAccept(self):
364
    self.assert_(netutils.TcpPing(self.address,
365
                                  self.deaflistenerport,
366
                                  timeout=constants.TCP_PING_TIMEOUT,
367
                                  live_port_needed=False,
368
                                  source=self.address,
369
                                  ), # ECONNREFUSED is OK
370
                 "failed to ping alive host on deaf port")
371

    
372
    self.assert_(netutils.TcpPing(self.address,
373
                                  self.deaflistenerport,
374
                                  timeout=constants.TCP_PING_TIMEOUT,
375
                                  live_port_needed=False,
376
                                  ), # ECONNREFUSED is OK
377
                 "failed to ping alive host on deaf port (no source)")
378

    
379

    
380
class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
381
  """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
382
  family = socket.AF_INET
383
  address = constants.IP4_ADDRESS_LOCALHOST
384

    
385
  def setUp(self):
386
    self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
387
    self.deaflistener.bind((self.address, 0))
388
    self.deaflistenerport = self.deaflistener.getsockname()[1]
389

    
390
  def tearDown(self):
391
    del self.deaflistener
392
    del self.deaflistenerport
393

    
394

    
395
class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
396
  """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
397
  family = socket.AF_INET6
398
  address = constants.IP6_ADDRESS_LOCALHOST
399

    
400
  def setUp(self):
401
    unittest.TestCase.setUp(self)
402
    _BaseTcpPingDeafTest.setUp(self)
403

    
404
  def tearDown(self):
405
    unittest.TestCase.tearDown(self)
406
    _BaseTcpPingDeafTest.tearDown(self)
407

    
408

    
409
class TestFormatAddress(unittest.TestCase):
410
  """Testcase for FormatAddress"""
411

    
412
  def testFormatAddressUnixSocket(self):
413
    res1 = netutils.FormatAddress(("12352", 0, 0), family=socket.AF_UNIX)
414
    self.assertEqual(res1, "pid=12352, uid=0, gid=0")
415

    
416
  def testFormatAddressIP4(self):
417
    res1 = netutils.FormatAddress(("127.0.0.1", 1234), family=socket.AF_INET)
418
    self.assertEqual(res1, "127.0.0.1:1234")
419
    res2 = netutils.FormatAddress(("192.0.2.32", None), family=socket.AF_INET)
420
    self.assertEqual(res2, "192.0.2.32")
421

    
422
  def testFormatAddressIP6(self):
423
    res1 = netutils.FormatAddress(("::1", 1234), family=socket.AF_INET6)
424
    self.assertEqual(res1, "[::1]:1234")
425
    res2 = netutils.FormatAddress(("::1", None), family=socket.AF_INET6)
426
    self.assertEqual(res2, "[::1]")
427
    res2 = netutils.FormatAddress(("2001:db8::beef", "80"),
428
                                  family=socket.AF_INET6)
429
    self.assertEqual(res2, "[2001:db8::beef]:80")
430

    
431
  def testFormatAddressWithoutFamily(self):
432
    res1 = netutils.FormatAddress(("127.0.0.1", 1234))
433
    self.assertEqual(res1, "127.0.0.1:1234")
434
    res2 = netutils.FormatAddress(("::1", 1234))
435
    self.assertEqual(res2, "[::1]:1234")
436

    
437

    
438
  def testInvalidFormatAddress(self):
439
    self.assertRaises(errors.ParameterError, netutils.FormatAddress,
440
                      "127.0.0.1")
441
    self.assertRaises(errors.ParameterError, netutils.FormatAddress,
442
                      "127.0.0.1", family=socket.AF_INET)
443
    self.assertRaises(errors.ParameterError, netutils.FormatAddress,
444
                      ("::1"), family=socket.AF_INET )
445

    
446
class TestIpParsing(testutils.GanetiTestCase):
447
  """Test the code that parses the ip command output"""
448

    
449
  def testIp4(self):
450
    valid_addresses = [constants.IP4_ADDRESS_ANY,
451
                       constants.IP4_ADDRESS_LOCALHOST,
452
                       "192.0.2.1",     # RFC5737, IPv4 address blocks for docs
453
                       "198.51.100.1",
454
                       "203.0.113.1",
455
                      ]
456
    for addr in valid_addresses:
457
      self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
458

    
459
  def testIp6(self):
460
    valid_addresses = [constants.IP6_ADDRESS_ANY,
461
                       constants.IP6_ADDRESS_LOCALHOST,
462
                       "0:0:0:0:0:0:0:1", # other form for IP6_ADDRESS_LOCALHOST
463
                       "0:0:0:0:0:0:0:0", # other form for IP6_ADDRESS_ANY
464
                       "2001:db8:85a3::8a2e:370:7334", # RFC3849 IP6 docs block
465
                       "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
466
                       "0:0:0:0:0:FFFF:192.0.2.1",  # IPv4-compatible IPv6
467
                       "::FFFF:192.0.2.1",
468
                       "0:0:0:0:0:0:203.0.113.1",   # IPv4-mapped IPv6
469
                       "::203.0.113.1",
470
                      ]
471
    for addr in valid_addresses:
472
      self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
473

    
474
  def testParseIpCommandOutput(self):
475
    # IPv4-only, fake loopback interface
476
    tests = ["ip-addr-show-lo-ipv4.txt", "ip-addr-show-lo-oneline-ipv4.txt"]
477
    for test_file in tests:
478
      data = self._ReadTestData(test_file)
479
      addr = netutils._GetIpAddressesFromIpOutput(data)
480
      self.failUnless(len(addr[4]) == 1 and addr[4][0] == "127.0.0.1" and not
481
                      addr[6])
482

    
483
    # IPv6-only, fake loopback interface
484
    tests = ["ip-addr-show-lo-ipv6.txt", "ip-addr-show-lo-ipv6.txt"]
485
    for test_file in tests:
486
      data = self._ReadTestData(test_file)
487
      addr = netutils._GetIpAddressesFromIpOutput(data)
488
      self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and not addr[4])
489

    
490
    # IPv4 and IPv6, fake loopback interface
491
    tests = ["ip-addr-show-lo.txt", "ip-addr-show-lo-oneline.txt"]
492
    for test_file in tests:
493
      data = self._ReadTestData(test_file)
494
      addr = netutils._GetIpAddressesFromIpOutput(data)
495
      self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and
496
                      len(addr[4]) == 1 and addr[4][0] == "127.0.0.1")
497

    
498
    # IPv4 and IPv6, dummy interface
499
    data = self._ReadTestData("ip-addr-show-dummy0.txt")
500
    addr = netutils._GetIpAddressesFromIpOutput(data)
501
    self.failUnless(len(addr[6]) == 1 and
502
                    addr[6][0] == "2001:db8:85a3::8a2e:370:7334" and
503
                    len(addr[4]) == 1 and
504
                    addr[4][0] == "192.0.2.1")
505

    
506

    
507
if __name__ == "__main__":
508
  testutils.GanetiTestProgram()