Add new test for RAPI
[ganeti-local] / test / ganeti.server.rapi_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.server.rapi"""
23
24 import re
25 import unittest
26 import random
27 import mimetools
28 import base64
29 from cStringIO import StringIO
30
31 from ganeti import constants
32 from ganeti import utils
33 from ganeti import compat
34 from ganeti import errors
35 from ganeti import serializer
36 from ganeti import rapi
37 from ganeti import http
38 from ganeti import objects
39
40 import ganeti.rapi.baserlib
41 import ganeti.rapi.testutils
42 import ganeti.rapi.rlib2
43 import ganeti.http.auth
44
45 import testutils
46
47
48 class TestRemoteApiHandler(unittest.TestCase):
49   @staticmethod
50   def _LookupWrongUser(_):
51     return None
52
53   def _Test(self, method, path, headers, reqbody,
54             user_fn=NotImplemented, luxi_client=NotImplemented):
55     rm = rapi.testutils._RapiMock(user_fn, luxi_client)
56
57     (resp_code, resp_headers, resp_body) = \
58       rm.FetchResponse(path, method, http.ParseHeaders(StringIO(headers)),
59                        reqbody)
60
61     self.assertTrue(resp_headers[http.HTTP_DATE])
62     self.assertEqual(resp_headers[http.HTTP_CONNECTION], "close")
63     self.assertEqual(resp_headers[http.HTTP_CONTENT_TYPE], http.HTTP_APP_JSON)
64     self.assertEqual(resp_headers[http.HTTP_SERVER], http.HTTP_GANETI_VERSION)
65
66     return (resp_code, resp_headers, serializer.LoadJson(resp_body))
67
68   def testRoot(self):
69     (code, _, data) = self._Test(http.HTTP_GET, "/", "", None)
70     self.assertEqual(code, http.HTTP_OK)
71     self.assertTrue(data is None)
72
73   def testVersion(self):
74     (code, _, data) = self._Test(http.HTTP_GET, "/version", "", None)
75     self.assertEqual(code, http.HTTP_OK)
76     self.assertEqual(data, constants.RAPI_VERSION)
77
78   def testSlashTwo(self):
79     (code, _, data) = self._Test(http.HTTP_GET, "/2", "", None)
80     self.assertEqual(code, http.HTTP_OK)
81     self.assertTrue(data is None)
82
83   def testFeatures(self):
84     (code, _, data) = self._Test(http.HTTP_GET, "/2/features", "", None)
85     self.assertEqual(code, http.HTTP_OK)
86     self.assertEqual(set(data), set(rapi.rlib2.ALL_FEATURES))
87
88   def testPutInstances(self):
89     (code, _, data) = self._Test(http.HTTP_PUT, "/2/instances", "", None)
90     self.assertEqual(code, http.HttpNotImplemented.code)
91     self.assertTrue(data["message"].startswith("Method PUT is unsupported"))
92
93   def testPostInstancesNoAuth(self):
94     (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", "", None)
95     self.assertEqual(code, http.HttpUnauthorized.code)
96
97   def testRequestWithUnsupportedMediaType(self):
98     for fn in [lambda s: s, lambda s: s.upper(), lambda s: s.title()]:
99       headers = rapi.testutils._FormatHeaders([
100         "%s: %s" % (http.HTTP_CONTENT_TYPE, fn("un/supported/media/type")),
101         ])
102       (code, _, data) = self._Test(http.HTTP_GET, "/", headers, "body")
103       self.assertEqual(code, http.HttpUnsupportedMediaType.code)
104       self.assertEqual(data["message"], "Unsupported Media Type")
105
106   def testRequestWithInvalidJsonData(self):
107     body = "_this/is/no'valid.json"
108     self.assertRaises(Exception, serializer.LoadJson, body)
109
110     headers = rapi.testutils._FormatHeaders([
111       "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
112       ])
113
114     (code, _, data) = self._Test(http.HTTP_GET, "/", headers, body)
115     self.assertEqual(code, http.HttpBadRequest.code)
116     self.assertEqual(data["message"], "Unable to parse JSON data")
117
118   def testUnsupportedAuthScheme(self):
119     headers = rapi.testutils._FormatHeaders([
120       "%s: %s" % (http.HTTP_AUTHORIZATION, "Unsupported scheme"),
121       ])
122
123     (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
124     self.assertEqual(code, http.HttpUnauthorized.code)
125
126   def testIncompleteBasicAuth(self):
127     headers = rapi.testutils._FormatHeaders([
128       "%s: Basic" % http.HTTP_AUTHORIZATION,
129       ])
130
131     (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
132     self.assertEqual(code, http.HttpBadRequest.code)
133     self.assertEqual(data["message"],
134                      "Basic authentication requires credentials")
135
136   def testInvalidBasicAuth(self):
137     for auth in ["!invalid=base!64.", base64.b64encode(" "),
138                  base64.b64encode("missingcolonchar")]:
139       headers = rapi.testutils._FormatHeaders([
140         "%s: Basic %s" % (http.HTTP_AUTHORIZATION, auth),
141         ])
142
143       (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
144       self.assertEqual(code, http.HttpUnauthorized.code)
145
146   @staticmethod
147   def _MakeAuthHeaders(username, password, correct_password):
148     if correct_password:
149       pw = password
150     else:
151       pw = "wrongpass"
152
153     return rapi.testutils._FormatHeaders([
154       "%s: Basic %s" % (http.HTTP_AUTHORIZATION,
155                         base64.b64encode("%s:%s" % (username, pw))),
156       "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
157       ])
158
159   def testQueryAuth(self):
160     username = "admin"
161     password = "2046920054"
162
163     header_fn = compat.partial(self._MakeAuthHeaders, username, password)
164
165     def _LookupUserNoWrite(name):
166       if name == username:
167         return http.auth.PasswordFileUser(name, password, [])
168       else:
169         return None
170
171     def _LookupUserWithWrite(name):
172       if name == username:
173         return http.auth.PasswordFileUser(name, password, [
174           rapi.RAPI_ACCESS_WRITE,
175           ])
176       else:
177         return None
178
179     for qr in constants.QR_VIA_RAPI:
180       # The /2/query resource has somewhat special rules for authentication as
181       # it can be used to retrieve critical information
182       path = "/2/query/%s" % qr
183
184       for method in rapi.baserlib._SUPPORTED_METHODS:
185         # No authorization
186         (code, _, _) = self._Test(method, path, "", "")
187
188         if method in (http.HTTP_DELETE, http.HTTP_POST):
189           self.assertEqual(code, http.HttpNotImplemented.code)
190           continue
191
192         self.assertEqual(code, http.HttpUnauthorized.code)
193
194         # Incorrect user
195         (code, _, _) = self._Test(method, path, header_fn(True), "",
196                                   user_fn=self._LookupWrongUser)
197         self.assertEqual(code, http.HttpUnauthorized.code)
198
199         # User has no write access, but the password is correct
200         (code, _, _) = self._Test(method, path, header_fn(True), "",
201                                   user_fn=_LookupUserNoWrite)
202         self.assertEqual(code, http.HttpForbidden.code)
203
204         # Wrong password and no write access
205         (code, _, _) = self._Test(method, path, header_fn(False), "",
206                                   user_fn=_LookupUserNoWrite)
207         self.assertEqual(code, http.HttpUnauthorized.code)
208
209         # Wrong password with write access
210         (code, _, _) = self._Test(method, path, header_fn(False), "",
211                                   user_fn=_LookupUserWithWrite)
212         self.assertEqual(code, http.HttpUnauthorized.code)
213
214         # Prepare request information
215         if method == http.HTTP_PUT:
216           reqpath = path
217           body = serializer.DumpJson({
218             "fields": ["name"],
219             })
220         elif method == http.HTTP_GET:
221           reqpath = "%s?fields=name" % path
222           body = ""
223         else:
224           self.fail("Unknown method '%s'" % method)
225
226         # User has write access, password is correct
227         (code, _, data) = self._Test(method, reqpath, header_fn(True), body,
228                                      user_fn=_LookupUserWithWrite,
229                                      luxi_client=_FakeLuxiClientForQuery)
230         self.assertEqual(code, http.HTTP_OK)
231         self.assertTrue(objects.QueryResponse.FromDict(data))
232
233
234 class _FakeLuxiClientForQuery:
235   def __init__(self, *args, **kwargs):
236     pass
237
238   def Query(self, *args):
239     return objects.QueryResponse(fields=[])
240
241
242 if __name__ == "__main__":
243   testutils.GanetiTestProgram()