4 # Copyright (C) 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the http module"""
30 from ganeti import http
32 import ganeti.http.server
33 import ganeti.http.client
34 import ganeti.http.auth
39 class TestStartLines(unittest.TestCase):
40 """Test cases for start line classes"""
42 def testClientToServerStartLine(self):
43 """Test client to server start line (HTTP request)"""
44 start_line = http.HttpClientToServerStartLine("GET", "/", "HTTP/1.1")
45 self.assertEqual(str(start_line), "GET / HTTP/1.1")
47 def testServerToClientStartLine(self):
48 """Test server to client start line (HTTP response)"""
49 start_line = http.HttpServerToClientStartLine("HTTP/1.1", 200, "OK")
50 self.assertEqual(str(start_line), "HTTP/1.1 200 OK")
53 class TestMisc(unittest.TestCase):
54 """Miscellaneous tests"""
56 def _TestDateTimeHeader(self, gmnow, expected):
57 self.assertEqual(http.server._DateTimeHeader(gmnow=gmnow), expected)
59 def testDateTimeHeader(self):
60 """Test ganeti.http._DateTimeHeader"""
61 self._TestDateTimeHeader((2008, 1, 2, 3, 4, 5, 3, 0, 0),
62 "Thu, 02 Jan 2008 03:04:05 GMT")
63 self._TestDateTimeHeader((2008, 1, 1, 0, 0, 0, 0, 0, 0),
64 "Mon, 01 Jan 2008 00:00:00 GMT")
65 self._TestDateTimeHeader((2008, 12, 31, 0, 0, 0, 0, 0, 0),
66 "Mon, 31 Dec 2008 00:00:00 GMT")
67 self._TestDateTimeHeader((2008, 12, 31, 23, 59, 59, 0, 0, 0),
68 "Mon, 31 Dec 2008 23:59:59 GMT")
69 self._TestDateTimeHeader((2008, 12, 31, 0, 0, 0, 6, 0, 0),
70 "Sun, 31 Dec 2008 00:00:00 GMT")
72 def testHttpServerRequest(self):
73 """Test ganeti.http.server._HttpServerRequest"""
74 server_request = http.server._HttpServerRequest("GET", "/", None, None)
76 # These are expected by users of the HTTP server
77 self.assert_(hasattr(server_request, "request_method"))
78 self.assert_(hasattr(server_request, "request_path"))
79 self.assert_(hasattr(server_request, "request_headers"))
80 self.assert_(hasattr(server_request, "request_body"))
81 self.assert_(isinstance(server_request.resp_headers, dict))
82 self.assert_(hasattr(server_request, "private"))
84 def testServerSizeLimits(self):
85 """Test HTTP server size limits"""
86 message_reader_class = http.server._HttpClientToServerMessageReader
87 self.assert_(message_reader_class.START_LINE_LENGTH_MAX > 0)
88 self.assert_(message_reader_class.HEADER_LENGTH_MAX > 0)
90 def testFormatAuthHeader(self):
91 self.assertEqual(http.auth._FormatAuthHeader("Basic", {}),
93 self.assertEqual(http.auth._FormatAuthHeader("Basic", { "foo": "bar", }),
95 self.assertEqual(http.auth._FormatAuthHeader("Basic", { "foo": "", }),
97 self.assertEqual(http.auth._FormatAuthHeader("Basic", { "foo": "x,y", }),
103 # It's a dict whose order isn't guaranteed, hence checking a list
104 self.assert_(http.auth._FormatAuthHeader("Digest", params) in
105 ("Digest foo=\"x,y\" realm=secure",
106 "Digest realm=secure foo=\"x,y\""))
109 class _FakeRequestAuth(http.auth.HttpServerRequestAuthentication):
110 def __init__(self, realm, authreq, authenticate_fn):
111 http.auth.HttpServerRequestAuthentication.__init__(self)
114 self.authreq = authreq
115 self.authenticate_fn = authenticate_fn
117 def AuthenticationRequired(self, req):
120 def GetAuthRealm(self, req):
123 def Authenticate(self, *args):
124 if self.authenticate_fn:
125 return self.authenticate_fn(*args)
126 raise NotImplementedError()
129 class TestAuth(unittest.TestCase):
130 """Authentication tests"""
132 hsra = http.auth.HttpServerRequestAuthentication
134 def testConstants(self):
135 for scheme in [self.hsra._CLEARTEXT_SCHEME, self.hsra._HA1_SCHEME]:
136 self.assertEqual(scheme, scheme.upper())
137 self.assert_(scheme.startswith("{"))
138 self.assert_(scheme.endswith("}"))
140 def _testVerifyBasicAuthPassword(self, realm, user, password, expected):
141 ra = _FakeRequestAuth(realm, False, None)
143 return ra.VerifyBasicAuthPassword(None, user, password, expected)
145 def testVerifyBasicAuthPassword(self):
146 tvbap = self._testVerifyBasicAuthPassword
148 good_pws = ["pw", "pw{", "pw}", "pw{}", "pw{x}y", "}pw",
149 "0", "123", "foo...:xyz", "TeST"]
152 # Try cleartext passwords
153 self.assert_(tvbap("abc", "user", pw, pw))
154 self.assert_(tvbap("abc", "user", pw, "{cleartext}" + pw))
155 self.assert_(tvbap("abc", "user", pw, "{ClearText}" + pw))
156 self.assert_(tvbap("abc", "user", pw, "{CLEARTEXT}" + pw))
158 # Try with invalid password
159 self.failIf(tvbap("abc", "user", pw, "something"))
161 # Try with invalid scheme
162 self.failIf(tvbap("abc", "user", pw, "{000}" + pw))
163 self.failIf(tvbap("abc", "user", pw, "{unk}" + pw))
164 self.failIf(tvbap("abc", "user", pw, "{Unk}" + pw))
165 self.failIf(tvbap("abc", "user", pw, "{UNK}" + pw))
167 # Try with invalid scheme format
168 self.failIf(tvbap("abc", "user", "pw", "{something"))
170 # Hash is MD5("user:This is only a test:pw")
171 self.assert_(tvbap("This is only a test", "user", "pw",
172 "{ha1}92ea58ae804481498c257b2f65561a17"))
173 self.assert_(tvbap("This is only a test", "user", "pw",
174 "{HA1}92ea58ae804481498c257b2f65561a17"))
176 self.failUnlessRaises(AssertionError, tvbap, None, "user", "pw",
177 "{HA1}92ea58ae804481498c257b2f65561a17")
178 self.failIf(tvbap("Admin area", "user", "pw",
179 "{HA1}92ea58ae804481498c257b2f65561a17"))
180 self.failIf(tvbap("This is only a test", "someone", "pw",
181 "{HA1}92ea58ae804481498c257b2f65561a17"))
182 self.failIf(tvbap("This is only a test", "user", "something",
183 "{HA1}92ea58ae804481498c257b2f65561a17"))
186 class _SimpleAuthenticator:
187 def __init__(self, user, password):
189 self.password = password
192 def __call__(self, req, user, password):
194 return self.user == user and self.password == password
197 class TestHttpServerRequestAuthentication(unittest.TestCase):
198 def testNoAuth(self):
199 req = http.server._HttpServerRequest("GET", "/", None, None)
200 _FakeRequestAuth("area1", False, None).PreHandleRequest(req)
202 def testNoRealm(self):
203 headers = { http.HTTP_AUTHORIZATION: "", }
204 req = http.server._HttpServerRequest("GET", "/", headers, None)
205 ra = _FakeRequestAuth(None, False, None)
206 self.assertRaises(AssertionError, ra.PreHandleRequest, req)
208 def testNoScheme(self):
209 headers = { http.HTTP_AUTHORIZATION: "", }
210 req = http.server._HttpServerRequest("GET", "/", headers, None)
211 ra = _FakeRequestAuth("area1", False, None)
212 self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
214 def testUnknownScheme(self):
215 headers = { http.HTTP_AUTHORIZATION: "NewStyleAuth abc", }
216 req = http.server._HttpServerRequest("GET", "/", headers, None)
217 ra = _FakeRequestAuth("area1", False, None)
218 self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
220 def testInvalidBase64(self):
221 headers = { http.HTTP_AUTHORIZATION: "Basic x_=_", }
222 req = http.server._HttpServerRequest("GET", "/", headers, None)
223 ra = _FakeRequestAuth("area1", False, None)
224 self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
226 def testAuthForPublicResource(self):
228 http.HTTP_AUTHORIZATION: "Basic %s" % ("foo".encode("base64").strip(), ),
230 req = http.server._HttpServerRequest("GET", "/", headers, None)
231 ra = _FakeRequestAuth("area1", False, None)
232 self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
234 def testAuthForPublicResource(self):
236 http.HTTP_AUTHORIZATION:
237 "Basic %s" % ("foo:bar".encode("base64").strip(), ),
239 req = http.server._HttpServerRequest("GET", "/", headers, None)
240 ac = _SimpleAuthenticator("foo", "bar")
241 ra = _FakeRequestAuth("area1", False, ac)
242 ra.PreHandleRequest(req)
244 req = http.server._HttpServerRequest("GET", "/", headers, None)
245 ac = _SimpleAuthenticator("something", "else")
246 ra = _FakeRequestAuth("area1", False, ac)
247 self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
249 def testInvalidRequestHeader(self):
251 http.HttpUnauthorized: ["", "\t", "-", ".", "@", "<", ">", "Digest",
252 "basic %s" % "foobar".encode("base64").strip()],
253 http.HttpBadRequest: ["Basic"],
256 for exc, headers in checks.items():
258 headers = { http.HTTP_AUTHORIZATION: i, }
259 req = http.server._HttpServerRequest("GET", "/", headers, None)
260 ra = _FakeRequestAuth("area1", False, None)
261 self.assertRaises(exc, ra.PreHandleRequest, req)
263 def testBasicAuth(self):
264 for user in ["", "joe", "user name with spaces"]:
265 for pw in ["", "-", ":", "foobar", "Foo Bar Baz", "@@@", "###",
267 for wrong_pw in [True, False]:
268 basic_auth = "%s:%s" % (user, pw)
270 basic_auth += "WRONG"
272 http.HTTP_AUTHORIZATION:
273 "Basic %s" % (basic_auth.encode("base64").strip(), ),
275 req = http.server._HttpServerRequest("GET", "/", headers, None)
277 ac = _SimpleAuthenticator(user, pw)
278 self.assertFalse(ac.called)
279 ra = _FakeRequestAuth("area1", True, ac)
282 ra.PreHandleRequest(req)
283 except http.HttpUnauthorized, err:
284 www_auth = err.headers[http.HTTP_WWW_AUTHENTICATE]
285 self.assert_(www_auth.startswith(http.auth.HTTP_BASIC_AUTH))
287 self.fail("Didn't raise HttpUnauthorized")
289 ra.PreHandleRequest(req)
290 self.assert_(ac.called)
293 class TestReadPasswordFile(testutils.GanetiTestCase):
295 testutils.GanetiTestCase.setUp(self)
297 self.tmpfile = tempfile.NamedTemporaryFile()
299 def testSimple(self):
300 self.tmpfile.write("user1 password")
303 users = http.auth.ReadPasswordFile(self.tmpfile.name)
304 self.assertEqual(len(users), 1)
305 self.assertEqual(users["user1"].password, "password")
306 self.assertEqual(len(users["user1"].options), 0)
308 def testOptions(self):
309 self.tmpfile.write("# Passwords\n")
310 self.tmpfile.write("user1 password\n")
311 self.tmpfile.write("\n")
312 self.tmpfile.write("# Comment\n")
313 self.tmpfile.write("user2 pw write,read\n")
314 self.tmpfile.write(" \t# Another comment\n")
315 self.tmpfile.write("invalidline\n")
318 users = http.auth.ReadPasswordFile(self.tmpfile.name)
319 self.assertEqual(len(users), 2)
320 self.assertEqual(users["user1"].password, "password")
321 self.assertEqual(len(users["user1"].options), 0)
323 self.assertEqual(users["user2"].password, "pw")
324 self.assertEqual(users["user2"].options, ["write", "read"])
327 class TestClientRequest(unittest.TestCase):
329 cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
330 headers=[], post_data="Hello World")
331 self.assert_(repr(cr).startswith("<"))
333 def testNoHeaders(self):
334 cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
336 self.assert_(isinstance(cr.headers, list))
337 self.assertEqual(cr.headers, [])
338 self.assertEqual(cr.url, "https://localhost:1234/version")
340 def testOldStyleHeaders(self):
342 "Content-type": "text/plain",
343 "Accept": "text/html",
345 cr = http.client.HttpClientRequest("localhost", 16481, "GET", "/vg_list",
347 self.assert_(isinstance(cr.headers, list))
348 self.assertEqual(sorted(cr.headers), [
350 "Content-type: text/plain",
352 self.assertEqual(cr.url, "https://localhost:16481/vg_list")
354 def testNewStyleHeaders(self):
357 "Content-type: text/plain; charset=ascii",
360 cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
362 self.assert_(isinstance(cr.headers, list))
363 self.assertEqual(sorted(cr.headers), sorted(headers))
364 self.assertEqual(cr.url, "https://localhost:1234/version")
366 def testPostData(self):
367 cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version",
368 post_data="Hello World")
369 self.assertEqual(cr.post_data, "Hello World")
371 def testNoPostData(self):
372 cr = http.client.HttpClientRequest("localhost", 1234, "GET", "/version")
373 self.assertEqual(cr.post_data, "")
375 def testIdentity(self):
376 # These should all use different connections, hence also have a different
378 cr1 = http.client.HttpClientRequest("localhost", 1234, "GET", "/version")
379 cr2 = http.client.HttpClientRequest("localhost", 9999, "GET", "/version")
380 cr3 = http.client.HttpClientRequest("node1", 1234, "GET", "/version")
381 cr4 = http.client.HttpClientRequest("node1", 9999, "GET", "/version")
383 self.assertEqual(len(set([cr1.identity, cr2.identity,
384 cr3.identity, cr4.identity])), 4)
386 # But this one should have the same
387 cr1vglist = http.client.HttpClientRequest("localhost", 1234,
389 self.assertEqual(cr1.identity, cr1vglist.identity)
392 class TestClient(unittest.TestCase):
394 pool = http.client.HttpClientPool(None)
395 self.assertFalse(pool._pool)
398 if __name__ == '__main__':
399 testutils.GanetiTestProgram()