root / test / ganeti.http_unittest.py @ a8950eb7
History | View | Annotate | Download (12.2 kB)
1 |
#!/usr/bin/python
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2007, 2008 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""Script for unittesting the http module"""
|
23 |
|
24 |
|
25 |
import os |
26 |
import unittest |
27 |
import time |
28 |
import tempfile |
29 |
|
30 |
from ganeti import http |
31 |
|
32 |
import ganeti.http.server |
33 |
import ganeti.http.client |
34 |
import ganeti.http.auth |
35 |
|
36 |
import testutils |
37 |
|
38 |
|
39 |
class TestStartLines(unittest.TestCase): |
40 |
"""Test cases for start line classes"""
|
41 |
|
42 |
def testClientToServerStartLine(self): |
43 |
"""Test client to server start line (HTTP request)"""
|
44 |
start_line = http.HttpClientToServerStartLine("GET", "/", "HTTP/1.1") |
45 |
self.assertEqual(str(start_line), "GET / HTTP/1.1") |
46 |
|
47 |
def testServerToClientStartLine(self): |
48 |
"""Test server to client start line (HTTP response)"""
|
49 |
start_line = http.HttpServerToClientStartLine("HTTP/1.1", 200, "OK") |
50 |
self.assertEqual(str(start_line), "HTTP/1.1 200 OK") |
51 |
|
52 |
|
53 |
class TestMisc(unittest.TestCase): |
54 |
"""Miscellaneous tests"""
|
55 |
|
56 |
def _TestDateTimeHeader(self, gmnow, expected): |
57 |
self.assertEqual(http.server._DateTimeHeader(gmnow=gmnow), expected)
|
58 |
|
59 |
def testDateTimeHeader(self): |
60 |
"""Test ganeti.http._DateTimeHeader"""
|
61 |
self._TestDateTimeHeader((2008, 1, 2, 3, 4, 5, 3, 0, 0), |
62 |
"Thu, 02 Jan 2008 03:04:05 GMT")
|
63 |
self._TestDateTimeHeader((2008, 1, 1, 0, 0, 0, 0, 0, 0), |
64 |
"Mon, 01 Jan 2008 00:00:00 GMT")
|
65 |
self._TestDateTimeHeader((2008, 12, 31, 0, 0, 0, 0, 0, 0), |
66 |
"Mon, 31 Dec 2008 00:00:00 GMT")
|
67 |
self._TestDateTimeHeader((2008, 12, 31, 23, 59, 59, 0, 0, 0), |
68 |
"Mon, 31 Dec 2008 23:59:59 GMT")
|
69 |
self._TestDateTimeHeader((2008, 12, 31, 0, 0, 0, 6, 0, 0), |
70 |
"Sun, 31 Dec 2008 00:00:00 GMT")
|
71 |
|
72 |
def testHttpServerRequest(self): |
73 |
"""Test ganeti.http.server._HttpServerRequest"""
|
74 |
server_request = http.server._HttpServerRequest("GET", "/", None, None) |
75 |
|
76 |
# These are expected by users of the HTTP server
|
77 |
self.assert_(hasattr(server_request, "request_method")) |
78 |
self.assert_(hasattr(server_request, "request_path")) |
79 |
self.assert_(hasattr(server_request, "request_headers")) |
80 |
self.assert_(hasattr(server_request, "request_body")) |
81 |
self.assert_(isinstance(server_request.resp_headers, dict)) |
82 |
self.assert_(hasattr(server_request, "private")) |
83 |
|
84 |
def testServerSizeLimits(self): |
85 |
"""Test HTTP server size limits"""
|
86 |
message_reader_class = http.server._HttpClientToServerMessageReader |
87 |
self.assert_(message_reader_class.START_LINE_LENGTH_MAX > 0) |
88 |
self.assert_(message_reader_class.HEADER_LENGTH_MAX > 0) |
89 |
|
90 |
def testClientSizeLimits(self): |
91 |
"""Test HTTP client size limits"""
|
92 |
message_reader_class = http.client._HttpServerToClientMessageReader |
93 |
self.assert_(message_reader_class.START_LINE_LENGTH_MAX > 0) |
94 |
self.assert_(message_reader_class.HEADER_LENGTH_MAX > 0) |
95 |
|
96 |
def testFormatAuthHeader(self): |
97 |
self.assertEqual(http.auth._FormatAuthHeader("Basic", {}), |
98 |
"Basic")
|
99 |
self.assertEqual(http.auth._FormatAuthHeader("Basic", { "foo": "bar", }), |
100 |
"Basic foo=bar")
|
101 |
self.assertEqual(http.auth._FormatAuthHeader("Basic", { "foo": "", }), |
102 |
"Basic foo=\"\"")
|
103 |
self.assertEqual(http.auth._FormatAuthHeader("Basic", { "foo": "x,y", }), |
104 |
"Basic foo=\"x,y\"")
|
105 |
params = { |
106 |
"foo": "x,y", |
107 |
"realm": "secure", |
108 |
} |
109 |
# It's a dict whose order isn't guaranteed, hence checking a list
|
110 |
self.assert_(http.auth._FormatAuthHeader("Digest", params) in |
111 |
("Digest foo=\"x,y\" realm=secure",
|
112 |
"Digest realm=secure foo=\"x,y\""))
|
113 |
|
114 |
|
115 |
class _FakeRequestAuth(http.auth.HttpServerRequestAuthentication): |
116 |
def __init__(self, realm, authreq, authenticate_fn): |
117 |
http.auth.HttpServerRequestAuthentication.__init__(self)
|
118 |
|
119 |
self.realm = realm
|
120 |
self.authreq = authreq
|
121 |
self.authenticate_fn = authenticate_fn
|
122 |
|
123 |
def AuthenticationRequired(self, req): |
124 |
return self.authreq |
125 |
|
126 |
def GetAuthRealm(self, req): |
127 |
return self.realm |
128 |
|
129 |
def Authenticate(self, *args): |
130 |
if self.authenticate_fn: |
131 |
return self.authenticate_fn(*args) |
132 |
raise NotImplementedError() |
133 |
|
134 |
|
135 |
class TestAuth(unittest.TestCase): |
136 |
"""Authentication tests"""
|
137 |
|
138 |
hsra = http.auth.HttpServerRequestAuthentication |
139 |
|
140 |
def testConstants(self): |
141 |
for scheme in [self.hsra._CLEARTEXT_SCHEME, self.hsra._HA1_SCHEME]: |
142 |
self.assertEqual(scheme, scheme.upper())
|
143 |
self.assert_(scheme.startswith("{")) |
144 |
self.assert_(scheme.endswith("}")) |
145 |
|
146 |
def _testVerifyBasicAuthPassword(self, realm, user, password, expected): |
147 |
ra = _FakeRequestAuth(realm, False, None) |
148 |
|
149 |
return ra.VerifyBasicAuthPassword(None, user, password, expected) |
150 |
|
151 |
def testVerifyBasicAuthPassword(self): |
152 |
tvbap = self._testVerifyBasicAuthPassword
|
153 |
|
154 |
good_pws = ["pw", "pw{", "pw}", "pw{}", "pw{x}y", "}pw", |
155 |
"0", "123", "foo...:xyz", "TeST"] |
156 |
|
157 |
for pw in good_pws: |
158 |
# Try cleartext passwords
|
159 |
self.assert_(tvbap("abc", "user", pw, pw)) |
160 |
self.assert_(tvbap("abc", "user", pw, "{cleartext}" + pw)) |
161 |
self.assert_(tvbap("abc", "user", pw, "{ClearText}" + pw)) |
162 |
self.assert_(tvbap("abc", "user", pw, "{CLEARTEXT}" + pw)) |
163 |
|
164 |
# Try with invalid password
|
165 |
self.failIf(tvbap("abc", "user", pw, "something")) |
166 |
|
167 |
# Try with invalid scheme
|
168 |
self.failIf(tvbap("abc", "user", pw, "{000}" + pw)) |
169 |
self.failIf(tvbap("abc", "user", pw, "{unk}" + pw)) |
170 |
self.failIf(tvbap("abc", "user", pw, "{Unk}" + pw)) |
171 |
self.failIf(tvbap("abc", "user", pw, "{UNK}" + pw)) |
172 |
|
173 |
# Try with invalid scheme format
|
174 |
self.failIf(tvbap("abc", "user", "pw", "{something")) |
175 |
|
176 |
# Hash is MD5("user:This is only a test:pw")
|
177 |
self.assert_(tvbap("This is only a test", "user", "pw", |
178 |
"{ha1}92ea58ae804481498c257b2f65561a17"))
|
179 |
self.assert_(tvbap("This is only a test", "user", "pw", |
180 |
"{HA1}92ea58ae804481498c257b2f65561a17"))
|
181 |
|
182 |
self.failUnlessRaises(AssertionError, tvbap, None, "user", "pw", |
183 |
"{HA1}92ea58ae804481498c257b2f65561a17")
|
184 |
self.failIf(tvbap("Admin area", "user", "pw", |
185 |
"{HA1}92ea58ae804481498c257b2f65561a17"))
|
186 |
self.failIf(tvbap("This is only a test", "someone", "pw", |
187 |
"{HA1}92ea58ae804481498c257b2f65561a17"))
|
188 |
self.failIf(tvbap("This is only a test", "user", "something", |
189 |
"{HA1}92ea58ae804481498c257b2f65561a17"))
|
190 |
|
191 |
|
192 |
class _SimpleAuthenticator: |
193 |
def __init__(self, user, password): |
194 |
self.user = user
|
195 |
self.password = password
|
196 |
self.called = False |
197 |
|
198 |
def __call__(self, req, user, password): |
199 |
self.called = True |
200 |
return self.user == user and self.password == password |
201 |
|
202 |
|
203 |
class TestHttpServerRequestAuthentication(unittest.TestCase): |
204 |
def testNoAuth(self): |
205 |
req = http.server._HttpServerRequest("GET", "/", None, None) |
206 |
_FakeRequestAuth("area1", False, None).PreHandleRequest(req) |
207 |
|
208 |
def testNoRealm(self): |
209 |
headers = { http.HTTP_AUTHORIZATION: "", }
|
210 |
req = http.server._HttpServerRequest("GET", "/", headers, None) |
211 |
ra = _FakeRequestAuth(None, False, None) |
212 |
self.assertRaises(AssertionError, ra.PreHandleRequest, req) |
213 |
|
214 |
def testNoScheme(self): |
215 |
headers = { http.HTTP_AUTHORIZATION: "", }
|
216 |
req = http.server._HttpServerRequest("GET", "/", headers, None) |
217 |
ra = _FakeRequestAuth("area1", False, None) |
218 |
self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
|
219 |
|
220 |
def testUnknownScheme(self): |
221 |
headers = { http.HTTP_AUTHORIZATION: "NewStyleAuth abc", }
|
222 |
req = http.server._HttpServerRequest("GET", "/", headers, None) |
223 |
ra = _FakeRequestAuth("area1", False, None) |
224 |
self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
|
225 |
|
226 |
def testInvalidBase64(self): |
227 |
headers = { http.HTTP_AUTHORIZATION: "Basic x_=_", }
|
228 |
req = http.server._HttpServerRequest("GET", "/", headers, None) |
229 |
ra = _FakeRequestAuth("area1", False, None) |
230 |
self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
|
231 |
|
232 |
def testAuthForPublicResource(self): |
233 |
headers = { |
234 |
http.HTTP_AUTHORIZATION: "Basic %s" % ("foo".encode("base64").strip(), ), |
235 |
} |
236 |
req = http.server._HttpServerRequest("GET", "/", headers, None) |
237 |
ra = _FakeRequestAuth("area1", False, None) |
238 |
self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
|
239 |
|
240 |
def testAuthForPublicResource(self): |
241 |
headers = { |
242 |
http.HTTP_AUTHORIZATION: |
243 |
"Basic %s" % ("foo:bar".encode("base64").strip(), ), |
244 |
} |
245 |
req = http.server._HttpServerRequest("GET", "/", headers, None) |
246 |
ac = _SimpleAuthenticator("foo", "bar") |
247 |
ra = _FakeRequestAuth("area1", False, ac) |
248 |
ra.PreHandleRequest(req) |
249 |
|
250 |
req = http.server._HttpServerRequest("GET", "/", headers, None) |
251 |
ac = _SimpleAuthenticator("something", "else") |
252 |
ra = _FakeRequestAuth("area1", False, ac) |
253 |
self.assertRaises(http.HttpUnauthorized, ra.PreHandleRequest, req)
|
254 |
|
255 |
def testInvalidRequestHeader(self): |
256 |
checks = { |
257 |
http.HttpUnauthorized: ["", "\t", "-", ".", "@", "<", ">", "Digest", |
258 |
"basic %s" % "foobar".encode("base64").strip()], |
259 |
http.HttpBadRequest: ["Basic"],
|
260 |
} |
261 |
|
262 |
for exc, headers in checks.items(): |
263 |
for i in headers: |
264 |
headers = { http.HTTP_AUTHORIZATION: i, } |
265 |
req = http.server._HttpServerRequest("GET", "/", headers, None) |
266 |
ra = _FakeRequestAuth("area1", False, None) |
267 |
self.assertRaises(exc, ra.PreHandleRequest, req)
|
268 |
|
269 |
def testBasicAuth(self): |
270 |
for user in ["", "joe", "user name with spaces"]: |
271 |
for pw in ["", "-", ":", "foobar", "Foo Bar Baz", "@@@", "###", |
272 |
"foo:bar:baz"]:
|
273 |
for wrong_pw in [True, False]: |
274 |
basic_auth = "%s:%s" % (user, pw)
|
275 |
if wrong_pw:
|
276 |
basic_auth += "WRONG"
|
277 |
headers = { |
278 |
http.HTTP_AUTHORIZATION: |
279 |
"Basic %s" % (basic_auth.encode("base64").strip(), ), |
280 |
} |
281 |
req = http.server._HttpServerRequest("GET", "/", headers, None) |
282 |
|
283 |
ac = _SimpleAuthenticator(user, pw) |
284 |
self.assertFalse(ac.called)
|
285 |
ra = _FakeRequestAuth("area1", True, ac) |
286 |
if wrong_pw:
|
287 |
try:
|
288 |
ra.PreHandleRequest(req) |
289 |
except http.HttpUnauthorized, err:
|
290 |
www_auth = err.headers[http.HTTP_WWW_AUTHENTICATE] |
291 |
self.assert_(www_auth.startswith(http.auth.HTTP_BASIC_AUTH))
|
292 |
else:
|
293 |
self.fail("Didn't raise HttpUnauthorized") |
294 |
else:
|
295 |
ra.PreHandleRequest(req) |
296 |
self.assert_(ac.called)
|
297 |
|
298 |
|
299 |
class TestReadPasswordFile(testutils.GanetiTestCase): |
300 |
def setUp(self): |
301 |
testutils.GanetiTestCase.setUp(self)
|
302 |
|
303 |
self.tmpfile = tempfile.NamedTemporaryFile()
|
304 |
|
305 |
def testSimple(self): |
306 |
self.tmpfile.write("user1 password") |
307 |
self.tmpfile.flush()
|
308 |
|
309 |
users = http.auth.ReadPasswordFile(self.tmpfile.name)
|
310 |
self.assertEqual(len(users), 1) |
311 |
self.assertEqual(users["user1"].password, "password") |
312 |
self.assertEqual(len(users["user1"].options), 0) |
313 |
|
314 |
def testOptions(self): |
315 |
self.tmpfile.write("# Passwords\n") |
316 |
self.tmpfile.write("user1 password\n") |
317 |
self.tmpfile.write("\n") |
318 |
self.tmpfile.write("# Comment\n") |
319 |
self.tmpfile.write("user2 pw write,read\n") |
320 |
self.tmpfile.write(" \t# Another comment\n") |
321 |
self.tmpfile.write("invalidline\n") |
322 |
self.tmpfile.flush()
|
323 |
|
324 |
users = http.auth.ReadPasswordFile(self.tmpfile.name)
|
325 |
self.assertEqual(len(users), 2) |
326 |
self.assertEqual(users["user1"].password, "password") |
327 |
self.assertEqual(len(users["user1"].options), 0) |
328 |
|
329 |
self.assertEqual(users["user2"].password, "pw") |
330 |
self.assertEqual(users["user2"].options, ["write", "read"]) |
331 |
|
332 |
|
333 |
if __name__ == '__main__': |
334 |
testutils.GanetiTestProgram() |