Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.netutils_unittest.py @ a744b676

History | View | Annotate | Download (11 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the netutils module"""
23

    
24
import os
25
import shutil
26
import socket
27
import tempfile
28
import unittest
29

    
30
import testutils
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import netutils
34
from ganeti import serializer
35
from ganeti import utils
36

    
37

    
38
def _GetSocketCredentials(path):
39
  """Connect to a Unix socket and return remote credentials.
40

41
  """
42
  sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
43
  try:
44
    sock.settimeout(10)
45
    sock.connect(path)
46
    return netutils.GetSocketCredentials(sock)
47
  finally:
48
    sock.close()
49

    
50

    
51
class TestGetSocketCredentials(unittest.TestCase):
52
  def setUp(self):
53
    self.tmpdir = tempfile.mkdtemp()
54
    self.sockpath = utils.PathJoin(self.tmpdir, "sock")
55

    
56
    self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
57
    self.listener.settimeout(10)
58
    self.listener.bind(self.sockpath)
59
    self.listener.listen(1)
60

    
61
  def tearDown(self):
62
    self.listener.shutdown(socket.SHUT_RDWR)
63
    self.listener.close()
64
    shutil.rmtree(self.tmpdir)
65

    
66
  def test(self):
67
    (c2pr, c2pw) = os.pipe()
68

    
69
    # Start child process
70
    child = os.fork()
71
    if child == 0:
72
      try:
73
        data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
74

    
75
        os.write(c2pw, data)
76
        os.close(c2pw)
77

    
78
        os._exit(0)
79
      finally:
80
        os._exit(1)
81

    
82
    os.close(c2pw)
83

    
84
    # Wait for one connection
85
    (conn, _) = self.listener.accept()
86
    conn.recv(1)
87
    conn.close()
88

    
89
    # Wait for result
90
    result = os.read(c2pr, 4096)
91
    os.close(c2pr)
92

    
93
    # Check child's exit code
94
    (_, status) = os.waitpid(child, 0)
95
    self.assertFalse(os.WIFSIGNALED(status))
96
    self.assertEqual(os.WEXITSTATUS(status), 0)
97

    
98
    # Check result
99
    (pid, uid, gid) = serializer.LoadJson(result)
100
    self.assertEqual(pid, os.getpid())
101
    self.assertEqual(uid, os.getuid())
102
    self.assertEqual(gid, os.getgid())
103

    
104

    
105
class TestHostInfo(unittest.TestCase):
106
  """Testing case for HostInfo"""
107

    
108
  def testUppercase(self):
109
    data = "AbC.example.com"
110
    self.failUnlessEqual(netutils.HostInfo.NormalizeName(data), data.lower())
111

    
112
  def testTooLongName(self):
113
    data = "a.b." + "c" * 255
114
    self.failUnlessRaises(errors.OpPrereqError,
115
                          netutils.HostInfo.NormalizeName, data)
116

    
117
  def testTrailingDot(self):
118
    data = "a.b.c"
119
    self.failUnlessEqual(netutils.HostInfo.NormalizeName(data + "."), data)
120

    
121
  def testInvalidName(self):
122
    data = [
123
      "a b",
124
      "a/b",
125
      ".a.b",
126
      "a..b",
127
      ]
128
    for value in data:
129
      self.failUnlessRaises(errors.OpPrereqError,
130
                            netutils.HostInfo.NormalizeName, value)
131

    
132
  def testValidName(self):
133
    data = [
134
      "a.b",
135
      "a-b",
136
      "a_b",
137
      "a.b.c",
138
      ]
139
    for value in data:
140
      netutils.HostInfo.NormalizeName(value)
141

    
142

    
143
class TestIsValidIP4(unittest.TestCase):
144
  def test(self):
145
    self.assert_(netutils.IsValidIP4("127.0.0.1"))
146
    self.assert_(netutils.IsValidIP4("0.0.0.0"))
147
    self.assert_(netutils.IsValidIP4("255.255.255.255"))
148
    self.assertFalse(netutils.IsValidIP4("0"))
149
    self.assertFalse(netutils.IsValidIP4("1"))
150
    self.assertFalse(netutils.IsValidIP4("1.1.1"))
151
    self.assertFalse(netutils.IsValidIP4("255.255.255.256"))
152
    self.assertFalse(netutils.IsValidIP4("::1"))
153

    
154

    
155
class TestIsValidIP6(unittest.TestCase):
156
  def test(self):
157
    self.assert_(netutils.IsValidIP6("::"))
158
    self.assert_(netutils.IsValidIP6("::1"))
159
    self.assert_(netutils.IsValidIP6("1" + (":1" * 7)))
160
    self.assert_(netutils.IsValidIP6("ffff" + (":ffff" * 7)))
161
    self.assertFalse(netutils.IsValidIP6("0"))
162
    self.assertFalse(netutils.IsValidIP6(":1"))
163
    self.assertFalse(netutils.IsValidIP6("f" + (":f" * 6)))
164
    self.assertFalse(netutils.IsValidIP6("fffg" + (":ffff" * 7)))
165
    self.assertFalse(netutils.IsValidIP6("fffff" + (":ffff" * 7)))
166
    self.assertFalse(netutils.IsValidIP6("1" + (":1" * 8)))
167
    self.assertFalse(netutils.IsValidIP6("127.0.0.1"))
168

    
169

    
170
class TestIsValidIP(unittest.TestCase):
171
  def test(self):
172
    self.assert_(netutils.IsValidIP("0.0.0.0"))
173
    self.assert_(netutils.IsValidIP("127.0.0.1"))
174
    self.assert_(netutils.IsValidIP("::"))
175
    self.assert_(netutils.IsValidIP("::1"))
176
    self.assertFalse(netutils.IsValidIP("0"))
177
    self.assertFalse(netutils.IsValidIP("1.1.1.256"))
178
    self.assertFalse(netutils.IsValidIP("a:g::1"))
179

    
180

    
181
class TestGetAddressFamily(unittest.TestCase):
182
  def test(self):
183
    self.assertEqual(netutils.GetAddressFamily("127.0.0.1"), socket.AF_INET)
184
    self.assertEqual(netutils.GetAddressFamily("10.2.0.127"), socket.AF_INET)
185
    self.assertEqual(netutils.GetAddressFamily("::1"), socket.AF_INET6)
186
    self.assertEqual(netutils.GetAddressFamily("fe80::a00:27ff:fe08:5048"),
187
                     socket.AF_INET6)
188
    self.assertRaises(errors.GenericError, netutils.GetAddressFamily, "0")
189

    
190

    
191
class _BaseTcpPingTest:
192
  """Base class for TcpPing tests against listen(2)ing port"""
193
  family = None
194
  address = None
195

    
196
  def setUp(self):
197
    self.listener = socket.socket(self.family, socket.SOCK_STREAM)
198
    self.listener.bind((self.address, 0))
199
    self.listenerport = self.listener.getsockname()[1]
200
    self.listener.listen(1)
201

    
202
  def tearDown(self):
203
    self.listener.shutdown(socket.SHUT_RDWR)
204
    del self.listener
205
    del self.listenerport
206

    
207
  def testTcpPingToLocalHostAccept(self):
208
    self.assert_(netutils.TcpPing(self.address,
209
                                  self.listenerport,
210
                                  timeout=constants.TCP_PING_TIMEOUT,
211
                                  live_port_needed=True,
212
                                  source=self.address,
213
                                  ),
214
                 "failed to connect to test listener")
215

    
216
    self.assert_(netutils.TcpPing(self.address, self.listenerport,
217
                                  timeout=constants.TCP_PING_TIMEOUT,
218
                                  live_port_needed=True),
219
                 "failed to connect to test listener (no source)")
220

    
221

    
222
class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
223
  """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
224
  family = socket.AF_INET
225
  address = constants.IP4_ADDRESS_LOCALHOST
226

    
227
  def setUp(self):
228
    unittest.TestCase.setUp(self)
229
    _BaseTcpPingTest.setUp(self)
230

    
231
  def tearDown(self):
232
    unittest.TestCase.tearDown(self)
233
    _BaseTcpPingTest.tearDown(self)
234

    
235

    
236
class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
237
  """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
238
  family = socket.AF_INET6
239
  address = constants.IP6_ADDRESS_LOCALHOST
240

    
241
  def setUp(self):
242
    unittest.TestCase.setUp(self)
243
    _BaseTcpPingTest.setUp(self)
244

    
245
  def tearDown(self):
246
    unittest.TestCase.tearDown(self)
247
    _BaseTcpPingTest.tearDown(self)
248

    
249

    
250
class _BaseTcpPingDeafTest:
251
  """Base class for TcpPing tests against non listen(2)ing port"""
252
  family = None
253
  address = None
254

    
255
  def setUp(self):
256
    self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
257
    self.deaflistener.bind((self.address, 0))
258
    self.deaflistenerport = self.deaflistener.getsockname()[1]
259

    
260
  def tearDown(self):
261
    del self.deaflistener
262
    del self.deaflistenerport
263

    
264
  def testTcpPingToLocalHostAcceptDeaf(self):
265
    self.assertFalse(netutils.TcpPing(self.address,
266
                                      self.deaflistenerport,
267
                                      timeout=constants.TCP_PING_TIMEOUT,
268
                                      live_port_needed=True,
269
                                      source=self.address,
270
                                      ), # need successful connect(2)
271
                     "successfully connected to deaf listener")
272

    
273
    self.assertFalse(netutils.TcpPing(self.address,
274
                                      self.deaflistenerport,
275
                                      timeout=constants.TCP_PING_TIMEOUT,
276
                                      live_port_needed=True,
277
                                      ), # need successful connect(2)
278
                     "successfully connected to deaf listener (no source)")
279

    
280
  def testTcpPingToLocalHostNoAccept(self):
281
    self.assert_(netutils.TcpPing(self.address,
282
                                  self.deaflistenerport,
283
                                  timeout=constants.TCP_PING_TIMEOUT,
284
                                  live_port_needed=False,
285
                                  source=self.address,
286
                                  ), # ECONNREFUSED is OK
287
                 "failed to ping alive host on deaf port")
288

    
289
    self.assert_(netutils.TcpPing(self.address,
290
                                  self.deaflistenerport,
291
                                  timeout=constants.TCP_PING_TIMEOUT,
292
                                  live_port_needed=False,
293
                                  ), # ECONNREFUSED is OK
294
                 "failed to ping alive host on deaf port (no source)")
295

    
296

    
297
class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
298
  """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
299
  family = socket.AF_INET
300
  address = constants.IP4_ADDRESS_LOCALHOST
301

    
302
  def setUp(self):
303
    self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
304
    self.deaflistener.bind((self.address, 0))
305
    self.deaflistenerport = self.deaflistener.getsockname()[1]
306

    
307
  def tearDown(self):
308
    del self.deaflistener
309
    del self.deaflistenerport
310

    
311

    
312
class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
313
  """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
314
  family = socket.AF_INET6
315
  address = constants.IP6_ADDRESS_LOCALHOST
316

    
317
  def setUp(self):
318
    unittest.TestCase.setUp(self)
319
    _BaseTcpPingDeafTest.setUp(self)
320

    
321
  def tearDown(self):
322
    unittest.TestCase.tearDown(self)
323
    _BaseTcpPingDeafTest.tearDown(self)
324

    
325

    
326
class TestOwnIpAddress(unittest.TestCase):
327
  """Testcase for OwnIpAddress"""
328

    
329
  def testOwnLoopback(self):
330
    """check having the loopback ip"""
331
    self.failUnless(netutils.OwnIpAddress(constants.IP4_ADDRESS_LOCALHOST),
332
                    "Should own the loopback address")
333

    
334
  def testNowOwnAddress(self):
335
    """check that I don't own an address"""
336

    
337
    # Network 192.0.2.0/24 is reserved for test/documentation as per
338
    # RFC 5735, so we *should* not have an address of this range... if
339
    # this fails, we should extend the test to multiple addresses
340
    DST_IP = "192.0.2.1"
341
    self.failIf(netutils.OwnIpAddress(DST_IP),
342
                "Should not own IP address %s" % DST_IP)
343

    
344

    
345
if __name__ == "__main__":
346
  testutils.GanetiTestProgram()