Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.netutils_unittest.py @ 1a8337f2

History | View | Annotate | Download (14 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the netutils module"""
23

    
24
import os
25
import shutil
26
import socket
27
import tempfile
28
import unittest
29

    
30
import testutils
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import netutils
34
from ganeti import serializer
35
from ganeti import utils
36

    
37

    
38
def _GetSocketCredentials(path):
39
  """Connect to a Unix socket and return remote credentials.
40

41
  """
42
  sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
43
  try:
44
    sock.settimeout(10)
45
    sock.connect(path)
46
    return netutils.GetSocketCredentials(sock)
47
  finally:
48
    sock.close()
49

    
50

    
51
class TestGetSocketCredentials(unittest.TestCase):
52
  def setUp(self):
53
    self.tmpdir = tempfile.mkdtemp()
54
    self.sockpath = utils.PathJoin(self.tmpdir, "sock")
55

    
56
    self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
57
    self.listener.settimeout(10)
58
    self.listener.bind(self.sockpath)
59
    self.listener.listen(1)
60

    
61
  def tearDown(self):
62
    self.listener.shutdown(socket.SHUT_RDWR)
63
    self.listener.close()
64
    shutil.rmtree(self.tmpdir)
65

    
66
  def test(self):
67
    (c2pr, c2pw) = os.pipe()
68

    
69
    # Start child process
70
    child = os.fork()
71
    if child == 0:
72
      try:
73
        data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
74

    
75
        os.write(c2pw, data)
76
        os.close(c2pw)
77

    
78
        os._exit(0)
79
      finally:
80
        os._exit(1)
81

    
82
    os.close(c2pw)
83

    
84
    # Wait for one connection
85
    (conn, _) = self.listener.accept()
86
    conn.recv(1)
87
    conn.close()
88

    
89
    # Wait for result
90
    result = os.read(c2pr, 4096)
91
    os.close(c2pr)
92

    
93
    # Check child's exit code
94
    (_, status) = os.waitpid(child, 0)
95
    self.assertFalse(os.WIFSIGNALED(status))
96
    self.assertEqual(os.WEXITSTATUS(status), 0)
97

    
98
    # Check result
99
    (pid, uid, gid) = serializer.LoadJson(result)
100
    self.assertEqual(pid, os.getpid())
101
    self.assertEqual(uid, os.getuid())
102
    self.assertEqual(gid, os.getgid())
103

    
104

    
105
class TestHostname(unittest.TestCase):
106
  """Testing case for Hostname"""
107

    
108
  def testUppercase(self):
109
    data = "AbC.example.com"
110
    self.assertEqual(netutils.Hostname.GetNormalizedName(data), data.lower())
111

    
112
  def testTooLongName(self):
113
    data = "a.b." + "c" * 255
114
    self.assertRaises(errors.OpPrereqError,
115
                      netutils.Hostname.GetNormalizedName, data)
116

    
117
  def testTrailingDot(self):
118
    data = "a.b.c"
119
    self.assertEqual(netutils.Hostname.GetNormalizedName(data + "."), data)
120

    
121
  def testInvalidName(self):
122
    data = [
123
      "a b",
124
      "a/b",
125
      ".a.b",
126
      "a..b",
127
      ]
128
    for value in data:
129
      self.assertRaises(errors.OpPrereqError,
130
                        netutils.Hostname.GetNormalizedName, value)
131

    
132
  def testValidName(self):
133
    data = [
134
      "a.b",
135
      "a-b",
136
      "a_b",
137
      "a.b.c",
138
      ]
139
    for value in data:
140
      self.assertEqual(netutils.Hostname.GetNormalizedName(value), value)
141

    
142

    
143
class TestIPAddress(unittest.TestCase):
144
  def testIsValid(self):
145
    self.assert_(netutils.IPAddress.IsValid("0.0.0.0"))
146
    self.assert_(netutils.IPAddress.IsValid("127.0.0.1"))
147
    self.assert_(netutils.IPAddress.IsValid("::"))
148
    self.assert_(netutils.IPAddress.IsValid("::1"))
149

    
150
  def testNotIsValid(self):
151
    self.assertFalse(netutils.IPAddress.IsValid("0"))
152
    self.assertFalse(netutils.IPAddress.IsValid("1.1.1.256"))
153
    self.assertFalse(netutils.IPAddress.IsValid("a:g::1"))
154

    
155

    
156
  def testGetAddressFamily(self):
157
    fn = netutils.IPAddress.GetAddressFamily
158
    self.assertEqual(fn("127.0.0.1"), socket.AF_INET)
159
    self.assertEqual(fn("10.2.0.127"), socket.AF_INET)
160
    self.assertEqual(fn("::1"), socket.AF_INET6)
161
    self.assertEqual(fn("2001:db8::1"), socket.AF_INET6)
162
    self.assertRaises(errors.IPAddressError, fn, "0")
163

    
164
  def testOwnLoopback(self):
165
    # FIXME: In a pure IPv6 environment this is no longer true
166
    self.assert_(netutils.IPAddress.Own("127.0.0.1"),
167
                 "Should own 127.0.0.1 address")
168

    
169
  def testNotOwnAddress(self):
170
    self.assertFalse(netutils.IPAddress.Own("2001:db8::1"),
171
                     "Should not own IP address 2001:db8::1")
172
    self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
173
                     "Should not own IP address 192.0.2.1")
174

    
175

    
176
class TestIP4Address(unittest.TestCase):
177
  def testGetIPIntFromString(self):
178
    fn = netutils.IP4Address._GetIPIntFromString
179
    self.assertEqual(fn("0.0.0.0"), 0)
180
    self.assertEqual(fn("0.0.0.1"), 1)
181
    self.assertEqual(fn("127.0.0.1"), 2130706433)
182
    self.assertEqual(fn("192.0.2.129"), 3221226113)
183
    self.assertEqual(fn("255.255.255.255"), 2**32 - 1)
184
    self.assertNotEqual(fn("0.0.0.0"), 1)
185
    self.assertNotEqual(fn("0.0.0.0"), 1)
186

    
187
  def testIsValid(self):
188
    self.assert_(netutils.IP4Address.IsValid("0.0.0.0"))
189
    self.assert_(netutils.IP4Address.IsValid("127.0.0.1"))
190
    self.assert_(netutils.IP4Address.IsValid("192.0.2.199"))
191
    self.assert_(netutils.IP4Address.IsValid("255.255.255.255"))
192

    
193
  def testNotIsValid(self):
194
    self.assertFalse(netutils.IP4Address.IsValid("0"))
195
    self.assertFalse(netutils.IP4Address.IsValid("1"))
196
    self.assertFalse(netutils.IP4Address.IsValid("1.1.1"))
197
    self.assertFalse(netutils.IP4Address.IsValid("255.255.255.256"))
198
    self.assertFalse(netutils.IP4Address.IsValid("::1"))
199

    
200
  def testInNetwork(self):
201
    self.assert_(netutils.IP4Address.InNetwork("127.0.0.0/8", "127.0.0.1"))
202

    
203
  def testNotInNetwork(self):
204
    self.assertFalse(netutils.IP4Address.InNetwork("192.0.2.0/24",
205
                                                   "127.0.0.1"))
206

    
207
  def testIsLoopback(self):
208
    self.assert_(netutils.IP4Address.IsLoopback("127.0.0.1"))
209

    
210
  def testNotIsLoopback(self):
211
    self.assertFalse(netutils.IP4Address.IsLoopback("192.0.2.1"))
212

    
213

    
214
class TestIP6Address(unittest.TestCase):
215
  def testGetIPIntFromString(self):
216
    fn = netutils.IP6Address._GetIPIntFromString
217
    self.assertEqual(fn("::"), 0)
218
    self.assertEqual(fn("::1"), 1)
219
    self.assertEqual(fn("2001:db8::1"),
220
                     42540766411282592856903984951653826561L)
221
    self.assertEqual(fn("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), 2**128-1)
222
    self.assertNotEqual(netutils.IP6Address._GetIPIntFromString("::2"), 1)
223

    
224
  def testIsValid(self):
225
    self.assert_(netutils.IP6Address.IsValid("::"))
226
    self.assert_(netutils.IP6Address.IsValid("::1"))
227
    self.assert_(netutils.IP6Address.IsValid("1" + (":1" * 7)))
228
    self.assert_(netutils.IP6Address.IsValid("ffff" + (":ffff" * 7)))
229
    self.assert_(netutils.IP6Address.IsValid("::"))
230

    
231
  def testNotIsValid(self):
232
    self.assertFalse(netutils.IP6Address.IsValid("0"))
233
    self.assertFalse(netutils.IP6Address.IsValid(":1"))
234
    self.assertFalse(netutils.IP6Address.IsValid("f" + (":f" * 6)))
235
    self.assertFalse(netutils.IP6Address.IsValid("fffg" + (":ffff" * 7)))
236
    self.assertFalse(netutils.IP6Address.IsValid("fffff" + (":ffff" * 7)))
237
    self.assertFalse(netutils.IP6Address.IsValid("1" + (":1" * 8)))
238
    self.assertFalse(netutils.IP6Address.IsValid("127.0.0.1"))
239

    
240
  def testInNetwork(self):
241
    self.assert_(netutils.IP6Address.InNetwork("::1/128", "::1"))
242

    
243
  def testNotInNetwork(self):
244
    self.assertFalse(netutils.IP6Address.InNetwork("2001:db8::1/128", "::1"))
245

    
246
  def testIsLoopback(self):
247
    self.assert_(netutils.IP6Address.IsLoopback("::1"))
248

    
249
  def testNotIsLoopback(self):
250
    self.assertFalse(netutils.IP6Address.IsLoopback("2001:db8::1"))
251

    
252

    
253
class _BaseTcpPingTest:
254
  """Base class for TcpPing tests against listen(2)ing port"""
255
  family = None
256
  address = None
257

    
258
  def setUp(self):
259
    self.listener = socket.socket(self.family, socket.SOCK_STREAM)
260
    self.listener.bind((self.address, 0))
261
    self.listenerport = self.listener.getsockname()[1]
262
    self.listener.listen(1)
263

    
264
  def tearDown(self):
265
    self.listener.shutdown(socket.SHUT_RDWR)
266
    del self.listener
267
    del self.listenerport
268

    
269
  def testTcpPingToLocalHostAccept(self):
270
    self.assert_(netutils.TcpPing(self.address,
271
                                  self.listenerport,
272
                                  timeout=constants.TCP_PING_TIMEOUT,
273
                                  live_port_needed=True,
274
                                  source=self.address,
275
                                  ),
276
                 "failed to connect to test listener")
277

    
278
    self.assert_(netutils.TcpPing(self.address, self.listenerport,
279
                                  timeout=constants.TCP_PING_TIMEOUT,
280
                                  live_port_needed=True),
281
                 "failed to connect to test listener (no source)")
282

    
283

    
284
class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
285
  """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
286
  family = socket.AF_INET
287
  address = constants.IP4_ADDRESS_LOCALHOST
288

    
289
  def setUp(self):
290
    unittest.TestCase.setUp(self)
291
    _BaseTcpPingTest.setUp(self)
292

    
293
  def tearDown(self):
294
    unittest.TestCase.tearDown(self)
295
    _BaseTcpPingTest.tearDown(self)
296

    
297

    
298
class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
299
  """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
300
  family = socket.AF_INET6
301
  address = constants.IP6_ADDRESS_LOCALHOST
302

    
303
  def setUp(self):
304
    unittest.TestCase.setUp(self)
305
    _BaseTcpPingTest.setUp(self)
306

    
307
  def tearDown(self):
308
    unittest.TestCase.tearDown(self)
309
    _BaseTcpPingTest.tearDown(self)
310

    
311

    
312
class _BaseTcpPingDeafTest:
313
  """Base class for TcpPing tests against non listen(2)ing port"""
314
  family = None
315
  address = None
316

    
317
  def setUp(self):
318
    self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
319
    self.deaflistener.bind((self.address, 0))
320
    self.deaflistenerport = self.deaflistener.getsockname()[1]
321

    
322
  def tearDown(self):
323
    del self.deaflistener
324
    del self.deaflistenerport
325

    
326
  def testTcpPingToLocalHostAcceptDeaf(self):
327
    self.assertFalse(netutils.TcpPing(self.address,
328
                                      self.deaflistenerport,
329
                                      timeout=constants.TCP_PING_TIMEOUT,
330
                                      live_port_needed=True,
331
                                      source=self.address,
332
                                      ), # need successful connect(2)
333
                     "successfully connected to deaf listener")
334

    
335
    self.assertFalse(netutils.TcpPing(self.address,
336
                                      self.deaflistenerport,
337
                                      timeout=constants.TCP_PING_TIMEOUT,
338
                                      live_port_needed=True,
339
                                      ), # need successful connect(2)
340
                     "successfully connected to deaf listener (no source)")
341

    
342
  def testTcpPingToLocalHostNoAccept(self):
343
    self.assert_(netutils.TcpPing(self.address,
344
                                  self.deaflistenerport,
345
                                  timeout=constants.TCP_PING_TIMEOUT,
346
                                  live_port_needed=False,
347
                                  source=self.address,
348
                                  ), # ECONNREFUSED is OK
349
                 "failed to ping alive host on deaf port")
350

    
351
    self.assert_(netutils.TcpPing(self.address,
352
                                  self.deaflistenerport,
353
                                  timeout=constants.TCP_PING_TIMEOUT,
354
                                  live_port_needed=False,
355
                                  ), # ECONNREFUSED is OK
356
                 "failed to ping alive host on deaf port (no source)")
357

    
358

    
359
class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
360
  """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
361
  family = socket.AF_INET
362
  address = constants.IP4_ADDRESS_LOCALHOST
363

    
364
  def setUp(self):
365
    self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
366
    self.deaflistener.bind((self.address, 0))
367
    self.deaflistenerport = self.deaflistener.getsockname()[1]
368

    
369
  def tearDown(self):
370
    del self.deaflistener
371
    del self.deaflistenerport
372

    
373

    
374
class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
375
  """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
376
  family = socket.AF_INET6
377
  address = constants.IP6_ADDRESS_LOCALHOST
378

    
379
  def setUp(self):
380
    unittest.TestCase.setUp(self)
381
    _BaseTcpPingDeafTest.setUp(self)
382

    
383
  def tearDown(self):
384
    unittest.TestCase.tearDown(self)
385
    _BaseTcpPingDeafTest.tearDown(self)
386

    
387

    
388
class TestFormatAddress(unittest.TestCase):
389
  """Testcase for FormatAddress"""
390

    
391
  def testFormatAddressUnixSocket(self):
392
    res1 = netutils.FormatAddress(socket.AF_UNIX, ("12352", 0, 0))
393
    self.assertEqual(res1, "pid=12352, uid=0, gid=0")
394

    
395
  def testFormatAddressIP4(self):
396
    res1 = netutils.FormatAddress(socket.AF_INET, ("127.0.0.1", 1234))
397
    self.assertEqual(res1, "127.0.0.1:1234")
398
    res2 = netutils.FormatAddress(socket.AF_INET, ("192.0.2.32", None))
399
    self.assertEqual(res2, "192.0.2.32")
400

    
401
  def testFormatAddressIP6(self):
402
    res1 = netutils.FormatAddress(socket.AF_INET6, ("::1", 1234))
403
    self.assertEqual(res1, "[::1]:1234")
404
    res2 = netutils.FormatAddress(socket.AF_INET6, ("::1", None))
405
    self.assertEqual(res2, "[::1]")
406
    res2 = netutils.FormatAddress(socket.AF_INET6, ("2001:db8::beef", "80"))
407
    self.assertEqual(res2, "[2001:db8::beef]:80")
408

    
409
  def testInvalidFormatAddress(self):
410
    self.assertRaises(errors.ParameterError,
411
                      netutils.FormatAddress, None, ("::1", None))
412
    self.assertRaises(errors.ParameterError,
413
                      netutils.FormatAddress, socket.AF_INET, "127.0.0.1")
414
    self.assertRaises(errors.ParameterError,
415
                      netutils.FormatAddress, socket.AF_INET, ("::1"))
416

    
417

    
418
if __name__ == "__main__":
419
  testutils.GanetiTestProgram()