4 # Copyright (C) 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.server.rapi"""
29 from cStringIO import StringIO
31 from ganeti import constants
32 from ganeti import utils
33 from ganeti import compat
34 from ganeti import errors
35 from ganeti import serializer
36 from ganeti import rapi
37 from ganeti import http
38 from ganeti import objects
40 import ganeti.rapi.baserlib
41 import ganeti.rapi.testutils
42 import ganeti.rapi.rlib2
43 import ganeti.http.auth
48 class TestRemoteApiHandler(unittest.TestCase):
50 def _LookupWrongUser(_):
53 def _Test(self, method, path, headers, reqbody,
54 user_fn=NotImplemented, luxi_client=NotImplemented):
55 rm = rapi.testutils._RapiMock(user_fn, luxi_client)
57 (resp_code, resp_headers, resp_body) = \
58 rm.FetchResponse(path, method, http.ParseHeaders(StringIO(headers)),
61 self.assertTrue(resp_headers[http.HTTP_DATE])
62 self.assertEqual(resp_headers[http.HTTP_CONNECTION], "close")
63 self.assertEqual(resp_headers[http.HTTP_CONTENT_TYPE], http.HTTP_APP_JSON)
64 self.assertEqual(resp_headers[http.HTTP_SERVER], http.HTTP_GANETI_VERSION)
66 return (resp_code, resp_headers, serializer.LoadJson(resp_body))
69 (code, _, data) = self._Test(http.HTTP_GET, "/", "", None)
70 self.assertEqual(code, http.HTTP_OK)
71 self.assertTrue(data is None)
73 def testVersion(self):
74 (code, _, data) = self._Test(http.HTTP_GET, "/version", "", None)
75 self.assertEqual(code, http.HTTP_OK)
76 self.assertEqual(data, constants.RAPI_VERSION)
78 def testSlashTwo(self):
79 (code, _, data) = self._Test(http.HTTP_GET, "/2", "", None)
80 self.assertEqual(code, http.HTTP_OK)
81 self.assertTrue(data is None)
83 def testFeatures(self):
84 (code, _, data) = self._Test(http.HTTP_GET, "/2/features", "", None)
85 self.assertEqual(code, http.HTTP_OK)
86 self.assertEqual(set(data), set(rapi.rlib2.ALL_FEATURES))
88 def testPutInstances(self):
89 (code, _, data) = self._Test(http.HTTP_PUT, "/2/instances", "", None)
90 self.assertEqual(code, http.HttpNotImplemented.code)
91 self.assertTrue(data["message"].startswith("Method PUT is unsupported"))
93 def testPostInstancesNoAuth(self):
94 (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", "", None)
95 self.assertEqual(code, http.HttpUnauthorized.code)
97 def testRequestWithUnsupportedMediaType(self):
98 for fn in [lambda s: s, lambda s: s.upper(), lambda s: s.title()]:
99 headers = rapi.testutils._FormatHeaders([
100 "%s: %s" % (http.HTTP_CONTENT_TYPE, fn("un/supported/media/type")),
102 (code, _, data) = self._Test(http.HTTP_GET, "/", headers, "body")
103 self.assertEqual(code, http.HttpUnsupportedMediaType.code)
104 self.assertEqual(data["message"], "Unsupported Media Type")
106 def testRequestWithInvalidJsonData(self):
107 body = "_this/is/no'valid.json"
108 self.assertRaises(Exception, serializer.LoadJson, body)
110 headers = rapi.testutils._FormatHeaders([
111 "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
114 (code, _, data) = self._Test(http.HTTP_GET, "/", headers, body)
115 self.assertEqual(code, http.HttpBadRequest.code)
116 self.assertEqual(data["message"], "Unable to parse JSON data")
118 def testUnsupportedAuthScheme(self):
119 headers = rapi.testutils._FormatHeaders([
120 "%s: %s" % (http.HTTP_AUTHORIZATION, "Unsupported scheme"),
123 (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
124 self.assertEqual(code, http.HttpUnauthorized.code)
126 def testIncompleteBasicAuth(self):
127 headers = rapi.testutils._FormatHeaders([
128 "%s: Basic" % http.HTTP_AUTHORIZATION,
131 (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
132 self.assertEqual(code, http.HttpBadRequest.code)
133 self.assertEqual(data["message"],
134 "Basic authentication requires credentials")
136 def testInvalidBasicAuth(self):
137 for auth in ["!invalid=base!64.", base64.b64encode(" "),
138 base64.b64encode("missingcolonchar")]:
139 headers = rapi.testutils._FormatHeaders([
140 "%s: Basic %s" % (http.HTTP_AUTHORIZATION, auth),
143 (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
144 self.assertEqual(code, http.HttpUnauthorized.code)
147 def _MakeAuthHeaders(username, password, correct_password):
153 return rapi.testutils._FormatHeaders([
154 "%s: Basic %s" % (http.HTTP_AUTHORIZATION,
155 base64.b64encode("%s:%s" % (username, pw))),
156 "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
159 def testQueryAuth(self):
161 password = "2046920054"
163 header_fn = compat.partial(self._MakeAuthHeaders, username, password)
165 def _LookupUserNoWrite(name):
167 return http.auth.PasswordFileUser(name, password, [])
171 for access in [rapi.RAPI_ACCESS_WRITE, rapi.RAPI_ACCESS_READ]:
172 def _LookupUserWithWrite(name):
174 return http.auth.PasswordFileUser(name, password, [
180 for qr in constants.QR_VIA_RAPI:
181 # The /2/query resource has somewhat special rules for authentication as
182 # it can be used to retrieve critical information
183 path = "/2/query/%s" % qr
185 for method in rapi.baserlib._SUPPORTED_METHODS:
187 (code, _, _) = self._Test(method, path, "", "")
189 if method in (http.HTTP_DELETE, http.HTTP_POST):
190 self.assertEqual(code, http.HttpNotImplemented.code)
193 self.assertEqual(code, http.HttpUnauthorized.code)
196 (code, _, _) = self._Test(method, path, header_fn(True), "",
197 user_fn=self._LookupWrongUser)
198 self.assertEqual(code, http.HttpUnauthorized.code)
200 # User has no write access, but the password is correct
201 (code, _, _) = self._Test(method, path, header_fn(True), "",
202 user_fn=_LookupUserNoWrite)
203 self.assertEqual(code, http.HttpForbidden.code)
205 # Wrong password and no write access
206 (code, _, _) = self._Test(method, path, header_fn(False), "",
207 user_fn=_LookupUserNoWrite)
208 self.assertEqual(code, http.HttpUnauthorized.code)
210 # Wrong password with write access
211 (code, _, _) = self._Test(method, path, header_fn(False), "",
212 user_fn=_LookupUserWithWrite)
213 self.assertEqual(code, http.HttpUnauthorized.code)
215 # Prepare request information
216 if method == http.HTTP_PUT:
218 body = serializer.DumpJson({
221 elif method == http.HTTP_GET:
222 reqpath = "%s?fields=name" % path
225 self.fail("Unknown method '%s'" % method)
227 # User has write access, password is correct
228 (code, _, data) = self._Test(method, reqpath, header_fn(True), body,
229 user_fn=_LookupUserWithWrite,
230 luxi_client=_FakeLuxiClientForQuery)
231 self.assertEqual(code, http.HTTP_OK)
232 self.assertTrue(objects.QueryResponse.FromDict(data))
234 def testConsole(self):
235 path = "/2/instances/inst1.example.com/console"
237 for method in rapi.baserlib._SUPPORTED_METHODS:
239 (code, _, _) = self._Test(method, path, "", "")
241 if method == http.HTTP_GET:
242 self.assertEqual(code, http.HttpUnauthorized.code)
244 self.assertEqual(code, http.HttpNotImplemented.code)
247 class _FakeLuxiClientForQuery:
248 def __init__(self, *args, **kwargs):
251 def Query(self, *args):
252 return objects.QueryResponse(fields=[])
255 if __name__ == "__main__":
256 testutils.GanetiTestProgram()