Revision bf9bd8dd
b/lib/http/auth.py | ||
---|---|---|
32 | 32 |
|
33 | 33 |
from cStringIO import StringIO |
34 | 34 |
|
35 |
try: |
|
36 |
from hashlib import md5 |
|
37 |
except ImportError: |
|
38 |
from md5 import new as md5 |
|
39 |
|
|
35 | 40 |
|
36 | 41 |
# Digest types from RFC2617 |
37 | 42 |
HTTP_BASIC_AUTH = "Basic" |
... | ... | |
75 | 80 |
# Default authentication realm |
76 | 81 |
AUTH_REALM = None |
77 | 82 |
|
83 |
# Schemes for passwords |
|
84 |
_CLEARTEXT_SCHEME = "{CLEARTEXT}" |
|
85 |
_HA1_SCHEME = "{HA1}" |
|
86 |
|
|
78 | 87 |
def GetAuthRealm(self, req): |
79 | 88 |
"""Returns the authentication realm for a request. |
80 | 89 |
|
... | ... | |
198 | 207 |
""" |
199 | 208 |
raise NotImplementedError() |
200 | 209 |
|
210 |
def VerifyBasicAuthPassword(self, req, username, password, expected): |
|
211 |
"""Checks the password for basic authentication. |
|
212 |
|
|
213 |
As long as they don't start with an opening brace ("{"), old passwords are |
|
214 |
supported. A new scheme uses H(A1) from RFC2617, where H is MD5 and A1 |
|
215 |
consists of the username, the authentication realm and the actual password. |
|
216 |
|
|
217 |
@type req: L{http.server._HttpServerRequest} |
|
218 |
@param req: HTTP request context |
|
219 |
@type username: string |
|
220 |
@param username: Username from HTTP headers |
|
221 |
@type password: string |
|
222 |
@param password: Password from HTTP headers |
|
223 |
@type expected: string |
|
224 |
@param expected: Expected password with optional scheme prefix (e.g. from |
|
225 |
users file) |
|
226 |
|
|
227 |
""" |
|
228 |
# Backwards compatibility for old-style passwords without a scheme |
|
229 |
if not expected.startswith("{"): |
|
230 |
expected = self._CLEARTEXT_SCHEME + expected |
|
231 |
|
|
232 |
# Check again, just to be sure |
|
233 |
if not expected.startswith("{"): |
|
234 |
raise AssertionError("Invalid scheme") |
|
235 |
|
|
236 |
scheme_end_idx = expected.find("}", 1) |
|
237 |
|
|
238 |
# Ensure scheme has a length of at least one character |
|
239 |
if scheme_end_idx <= 1: |
|
240 |
logging.warning("Invalid scheme in password for user '%s'", username) |
|
241 |
return False |
|
242 |
|
|
243 |
scheme = expected[:scheme_end_idx + 1].upper() |
|
244 |
expected_password = expected[scheme_end_idx + 1:] |
|
245 |
|
|
246 |
# Good old plain text password |
|
247 |
if scheme == self._CLEARTEXT_SCHEME: |
|
248 |
return password == expected_password |
|
249 |
|
|
250 |
# H(A1) as described in RFC2617 |
|
251 |
if scheme == self._HA1_SCHEME: |
|
252 |
realm = self.GetAuthRealm(req) |
|
253 |
if not realm: |
|
254 |
# There can not be a valid password for this case |
|
255 |
return False |
|
256 |
|
|
257 |
expha1 = md5() |
|
258 |
expha1.update("%s:%s:%s" % (username, realm, password)) |
|
259 |
|
|
260 |
return (expected_password.lower() == expha1.hexdigest().lower()) |
|
261 |
|
|
262 |
logging.warning("Unknown scheme '%s' in password for user '%s'", |
|
263 |
scheme, username) |
|
264 |
|
|
265 |
return False |
|
266 |
|
|
201 | 267 |
|
202 | 268 |
class PasswordFileUser(object): |
203 | 269 |
"""Data structure for users from password file. |
b/test/ganeti.http_unittest.py | ||
---|---|---|
30 | 30 |
|
31 | 31 |
import ganeti.http.server |
32 | 32 |
import ganeti.http.client |
33 |
import ganeti.http.auth |
|
33 | 34 |
|
34 | 35 |
|
35 | 36 |
class TestStartLines(unittest.TestCase): |
... | ... | |
93 | 94 |
self.assert_(message_reader_class.HEADER_LENGTH_MAX > 0) |
94 | 95 |
|
95 | 96 |
|
97 |
class _FakeRequestAuth(http.auth.HttpServerRequestAuthentication): |
|
98 |
def __init__(self, realm): |
|
99 |
http.auth.HttpServerRequestAuthentication.__init__(self) |
|
100 |
|
|
101 |
self.realm = realm |
|
102 |
|
|
103 |
def GetAuthRealm(self, req): |
|
104 |
return self.realm |
|
105 |
|
|
106 |
|
|
107 |
class TestAuth(unittest.TestCase): |
|
108 |
"""Authentication tests""" |
|
109 |
|
|
110 |
hsra = http.auth.HttpServerRequestAuthentication |
|
111 |
|
|
112 |
def testConstants(self): |
|
113 |
self.assertEqual(self.hsra._CLEARTEXT_SCHEME, |
|
114 |
self.hsra._CLEARTEXT_SCHEME.upper()) |
|
115 |
self.assertEqual(self.hsra._HA1_SCHEME, |
|
116 |
self.hsra._HA1_SCHEME.upper()) |
|
117 |
|
|
118 |
def _testVerifyBasicAuthPassword(self, realm, user, password, expected): |
|
119 |
ra = _FakeRequestAuth(realm) |
|
120 |
|
|
121 |
return ra.VerifyBasicAuthPassword(None, user, password, expected) |
|
122 |
|
|
123 |
|
|
124 |
def testVerifyBasicAuthPassword(self): |
|
125 |
tvbap = self._testVerifyBasicAuthPassword |
|
126 |
|
|
127 |
good_pws = ["pw", "pw{", "pw}", "pw{}", "pw{x}y", "}pw", |
|
128 |
"0", "123", "foo...:xyz", "TeST"] |
|
129 |
|
|
130 |
for pw in good_pws: |
|
131 |
# Try cleartext passwords |
|
132 |
self.assert_(tvbap("abc", "user", pw, pw)) |
|
133 |
self.assert_(tvbap("abc", "user", pw, "{cleartext}" + pw)) |
|
134 |
self.assert_(tvbap("abc", "user", pw, "{ClearText}" + pw)) |
|
135 |
self.assert_(tvbap("abc", "user", pw, "{CLEARTEXT}" + pw)) |
|
136 |
|
|
137 |
# Try with invalid password |
|
138 |
self.failIf(tvbap("abc", "user", pw, "something")) |
|
139 |
|
|
140 |
# Try with invalid scheme |
|
141 |
self.failIf(tvbap("abc", "user", pw, "{000}" + pw)) |
|
142 |
self.failIf(tvbap("abc", "user", pw, "{unk}" + pw)) |
|
143 |
self.failIf(tvbap("abc", "user", pw, "{Unk}" + pw)) |
|
144 |
self.failIf(tvbap("abc", "user", pw, "{UNK}" + pw)) |
|
145 |
|
|
146 |
# Try with invalid scheme format |
|
147 |
self.failIf(tvbap("abc", "user", "pw", "{something")) |
|
148 |
|
|
149 |
# Hash is MD5("user:This is only a test:pw") |
|
150 |
self.assert_(tvbap("This is only a test", "user", "pw", |
|
151 |
"{ha1}92ea58ae804481498c257b2f65561a17")) |
|
152 |
self.assert_(tvbap("This is only a test", "user", "pw", |
|
153 |
"{HA1}92ea58ae804481498c257b2f65561a17")) |
|
154 |
|
|
155 |
self.failIf(tvbap(None, "user", "pw", |
|
156 |
"{HA1}92ea58ae804481498c257b2f65561a17")) |
|
157 |
self.failIf(tvbap("Admin area", "user", "pw", |
|
158 |
"{HA1}92ea58ae804481498c257b2f65561a17")) |
|
159 |
self.failIf(tvbap("This is only a test", "someone", "pw", |
|
160 |
"{HA1}92ea58ae804481498c257b2f65561a17")) |
|
161 |
self.failIf(tvbap("This is only a test", "user", "something", |
|
162 |
"{HA1}92ea58ae804481498c257b2f65561a17")) |
|
163 |
|
|
164 |
|
|
96 | 165 |
if __name__ == '__main__': |
97 | 166 |
unittest.main() |
Also available in: Unified diff