4 # Copyright (C) 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.server.rapi"""
29 from cStringIO import StringIO
31 from ganeti import constants
32 from ganeti import utils
33 from ganeti import compat
34 from ganeti import errors
35 from ganeti import serializer
36 from ganeti import rapi
37 from ganeti import http
38 from ganeti import objects
40 import ganeti.rapi.baserlib
41 import ganeti.rapi.testutils
42 import ganeti.rapi.rlib2
43 import ganeti.http.auth
48 class TestRemoteApiHandler(unittest.TestCase):
50 def _LookupWrongUser(_):
53 def _Test(self, method, path, headers, reqbody,
54 user_fn=NotImplemented, luxi_client=NotImplemented):
55 rm = rapi.testutils._RapiMock(user_fn, luxi_client)
57 (resp_code, resp_headers, resp_body) = \
58 rm.FetchResponse(path, method, http.ParseHeaders(StringIO(headers)),
61 self.assertTrue(resp_headers[http.HTTP_DATE])
62 self.assertEqual(resp_headers[http.HTTP_CONNECTION], "close")
63 self.assertEqual(resp_headers[http.HTTP_CONTENT_TYPE], http.HTTP_APP_JSON)
64 self.assertEqual(resp_headers[http.HTTP_SERVER], http.HTTP_GANETI_VERSION)
66 return (resp_code, resp_headers, serializer.LoadJson(resp_body))
69 (code, _, data) = self._Test(http.HTTP_GET, "/", "", None)
70 self.assertEqual(code, http.HTTP_OK)
71 self.assertTrue(data is None)
73 def testVersion(self):
74 (code, _, data) = self._Test(http.HTTP_GET, "/version", "", None)
75 self.assertEqual(code, http.HTTP_OK)
76 self.assertEqual(data, constants.RAPI_VERSION)
78 def testSlashTwo(self):
79 (code, _, data) = self._Test(http.HTTP_GET, "/2", "", None)
80 self.assertEqual(code, http.HTTP_OK)
81 self.assertTrue(data is None)
83 def testFeatures(self):
84 (code, _, data) = self._Test(http.HTTP_GET, "/2/features", "", None)
85 self.assertEqual(code, http.HTTP_OK)
86 self.assertEqual(set(data), set(rapi.rlib2.ALL_FEATURES))
88 def testPutInstances(self):
89 (code, _, data) = self._Test(http.HTTP_PUT, "/2/instances", "", None)
90 self.assertEqual(code, http.HttpNotImplemented.code)
91 self.assertTrue(data["message"].startswith("Method PUT is unsupported"))
93 def testPostInstancesNoAuth(self):
94 (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", "", None)
95 self.assertEqual(code, http.HttpUnauthorized.code)
97 def testRequestWithUnsupportedMediaType(self):
98 for fn in [lambda s: s, lambda s: s.upper(), lambda s: s.title()]:
99 headers = rapi.testutils._FormatHeaders([
100 "%s: %s" % (http.HTTP_CONTENT_TYPE, fn("un/supported/media/type")),
102 (code, _, data) = self._Test(http.HTTP_GET, "/", headers, "body")
103 self.assertEqual(code, http.HttpUnsupportedMediaType.code)
104 self.assertEqual(data["message"], "Unsupported Media Type")
106 def testRequestWithInvalidJsonData(self):
107 body = "_this/is/no'valid.json"
108 self.assertRaises(Exception, serializer.LoadJson, body)
110 headers = rapi.testutils._FormatHeaders([
111 "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
114 (code, _, data) = self._Test(http.HTTP_GET, "/", headers, body)
115 self.assertEqual(code, http.HttpBadRequest.code)
116 self.assertEqual(data["message"], "Unable to parse JSON data")
118 def testUnsupportedAuthScheme(self):
119 headers = rapi.testutils._FormatHeaders([
120 "%s: %s" % (http.HTTP_AUTHORIZATION, "Unsupported scheme"),
123 (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
124 self.assertEqual(code, http.HttpUnauthorized.code)
126 def testIncompleteBasicAuth(self):
127 headers = rapi.testutils._FormatHeaders([
128 "%s: Basic" % http.HTTP_AUTHORIZATION,
131 (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
132 self.assertEqual(code, http.HttpBadRequest.code)
133 self.assertEqual(data["message"],
134 "Basic authentication requires credentials")
136 def testInvalidBasicAuth(self):
137 for auth in ["!invalid=base!64.", base64.b64encode(" "),
138 base64.b64encode("missingcolonchar")]:
139 headers = rapi.testutils._FormatHeaders([
140 "%s: Basic %s" % (http.HTTP_AUTHORIZATION, auth),
143 (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
144 self.assertEqual(code, http.HttpUnauthorized.code)
147 def _MakeAuthHeaders(username, password, correct_password):
153 return rapi.testutils._FormatHeaders([
154 "%s: Basic %s" % (http.HTTP_AUTHORIZATION,
155 base64.b64encode("%s:%s" % (username, pw))),
156 "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
159 def testQueryAuth(self):
161 password = "2046920054"
163 header_fn = compat.partial(self._MakeAuthHeaders, username, password)
165 def _LookupUserNoWrite(name):
167 return http.auth.PasswordFileUser(name, password, [])
171 def _LookupUserWithWrite(name):
173 return http.auth.PasswordFileUser(name, password, [
174 rapi.RAPI_ACCESS_WRITE,
179 for qr in constants.QR_VIA_RAPI:
180 # The /2/query resource has somewhat special rules for authentication as
181 # it can be used to retrieve critical information
182 path = "/2/query/%s" % qr
184 for method in rapi.baserlib._SUPPORTED_METHODS:
186 (code, _, _) = self._Test(method, path, "", "")
188 if method in (http.HTTP_DELETE, http.HTTP_POST):
189 self.assertEqual(code, http.HttpNotImplemented.code)
192 self.assertEqual(code, http.HttpUnauthorized.code)
195 (code, _, _) = self._Test(method, path, header_fn(True), "",
196 user_fn=self._LookupWrongUser)
197 self.assertEqual(code, http.HttpUnauthorized.code)
199 # User has no write access, but the password is correct
200 (code, _, _) = self._Test(method, path, header_fn(True), "",
201 user_fn=_LookupUserNoWrite)
202 self.assertEqual(code, http.HttpForbidden.code)
204 # Wrong password and no write access
205 (code, _, _) = self._Test(method, path, header_fn(False), "",
206 user_fn=_LookupUserNoWrite)
207 self.assertEqual(code, http.HttpUnauthorized.code)
209 # Wrong password with write access
210 (code, _, _) = self._Test(method, path, header_fn(False), "",
211 user_fn=_LookupUserWithWrite)
212 self.assertEqual(code, http.HttpUnauthorized.code)
214 # Prepare request information
215 if method == http.HTTP_PUT:
217 body = serializer.DumpJson({
220 elif method == http.HTTP_GET:
221 reqpath = "%s?fields=name" % path
224 self.fail("Unknown method '%s'" % method)
226 # User has write access, password is correct
227 (code, _, data) = self._Test(method, reqpath, header_fn(True), body,
228 user_fn=_LookupUserWithWrite,
229 luxi_client=_FakeLuxiClientForQuery)
230 self.assertEqual(code, http.HTTP_OK)
231 self.assertTrue(objects.QueryResponse.FromDict(data))
234 class _FakeLuxiClientForQuery:
235 def __init__(self, *args, **kwargs):
238 def Query(self, *args):
239 return objects.QueryResponse(fields=[])
242 if __name__ == "__main__":
243 testutils.GanetiTestProgram()