Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.server.rapi_unittest.py @ 14933c17

History | View | Annotate | Download (9.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.server.rapi"""
23

    
24
import re
25
import unittest
26
import random
27
import mimetools
28
import base64
29
from cStringIO import StringIO
30

    
31
from ganeti import constants
32
from ganeti import utils
33
from ganeti import compat
34
from ganeti import errors
35
from ganeti import serializer
36
from ganeti import rapi
37
from ganeti import http
38
from ganeti import objects
39

    
40
import ganeti.rapi.baserlib
41
import ganeti.rapi.testutils
42
import ganeti.rapi.rlib2
43
import ganeti.http.auth
44

    
45
import testutils
46

    
47

    
48
class TestRemoteApiHandler(unittest.TestCase):
49
  @staticmethod
50
  def _LookupWrongUser(_):
51
    return None
52

    
53
  def _Test(self, method, path, headers, reqbody,
54
            user_fn=NotImplemented, luxi_client=NotImplemented,
55
            reqauth=False):
56
    rm = rapi.testutils._RapiMock(user_fn, luxi_client, reqauth=reqauth)
57

    
58
    (resp_code, resp_headers, resp_body) = \
59
      rm.FetchResponse(path, method, http.ParseHeaders(StringIO(headers)),
60
                       reqbody)
61

    
62
    self.assertTrue(resp_headers[http.HTTP_DATE])
63
    self.assertEqual(resp_headers[http.HTTP_CONNECTION], "close")
64
    self.assertEqual(resp_headers[http.HTTP_CONTENT_TYPE], http.HTTP_APP_JSON)
65
    self.assertEqual(resp_headers[http.HTTP_SERVER], http.HTTP_GANETI_VERSION)
66

    
67
    return (resp_code, resp_headers, serializer.LoadJson(resp_body))
68

    
69
  def testRoot(self):
70
    (code, _, data) = self._Test(http.HTTP_GET, "/", "", None)
71
    self.assertEqual(code, http.HTTP_OK)
72
    self.assertTrue(data is None)
73

    
74
  def testRootReqAuth(self):
75
    (code, _, _) = self._Test(http.HTTP_GET, "/", "", None, reqauth=True)
76
    self.assertEqual(code, http.HttpUnauthorized.code)
77

    
78
  def testVersion(self):
79
    (code, _, data) = self._Test(http.HTTP_GET, "/version", "", None)
80
    self.assertEqual(code, http.HTTP_OK)
81
    self.assertEqual(data, constants.RAPI_VERSION)
82

    
83
  def testSlashTwo(self):
84
    (code, _, data) = self._Test(http.HTTP_GET, "/2", "", None)
85
    self.assertEqual(code, http.HTTP_OK)
86
    self.assertTrue(data is None)
87

    
88
  def testFeatures(self):
89
    (code, _, data) = self._Test(http.HTTP_GET, "/2/features", "", None)
90
    self.assertEqual(code, http.HTTP_OK)
91
    self.assertEqual(set(data), set(rapi.rlib2.ALL_FEATURES))
92

    
93
  def testPutInstances(self):
94
    (code, _, data) = self._Test(http.HTTP_PUT, "/2/instances", "", None)
95
    self.assertEqual(code, http.HttpNotImplemented.code)
96
    self.assertTrue(data["message"].startswith("Method PUT is unsupported"))
97

    
98
  def testPostInstancesNoAuth(self):
99
    (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", "", None)
100
    self.assertEqual(code, http.HttpUnauthorized.code)
101

    
102
  def testRequestWithUnsupportedMediaType(self):
103
    for fn in [lambda s: s, lambda s: s.upper(), lambda s: s.title()]:
104
      headers = rapi.testutils._FormatHeaders([
105
        "%s: %s" % (http.HTTP_CONTENT_TYPE, fn("un/supported/media/type")),
106
        ])
107
      (code, _, data) = self._Test(http.HTTP_GET, "/", headers, "body")
108
      self.assertEqual(code, http.HttpUnsupportedMediaType.code)
109
      self.assertEqual(data["message"], "Unsupported Media Type")
110

    
111
  def testRequestWithInvalidJsonData(self):
112
    body = "_this/is/no'valid.json"
113
    self.assertRaises(Exception, serializer.LoadJson, body)
114

    
115
    headers = rapi.testutils._FormatHeaders([
116
      "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
117
      ])
118

    
119
    (code, _, data) = self._Test(http.HTTP_GET, "/", headers, body)
120
    self.assertEqual(code, http.HttpBadRequest.code)
121
    self.assertEqual(data["message"], "Unable to parse JSON data")
122

    
123
  def testUnsupportedAuthScheme(self):
124
    headers = rapi.testutils._FormatHeaders([
125
      "%s: %s" % (http.HTTP_AUTHORIZATION, "Unsupported scheme"),
126
      ])
127

    
128
    (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
129
    self.assertEqual(code, http.HttpUnauthorized.code)
130

    
131
  def testIncompleteBasicAuth(self):
132
    headers = rapi.testutils._FormatHeaders([
133
      "%s: Basic" % http.HTTP_AUTHORIZATION,
134
      ])
135

    
136
    (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
137
    self.assertEqual(code, http.HttpBadRequest.code)
138
    self.assertEqual(data["message"],
139
                     "Basic authentication requires credentials")
140

    
141
  def testInvalidBasicAuth(self):
142
    for auth in ["!invalid=base!64.", base64.b64encode(" "),
143
                 base64.b64encode("missingcolonchar")]:
144
      headers = rapi.testutils._FormatHeaders([
145
        "%s: Basic %s" % (http.HTTP_AUTHORIZATION, auth),
146
        ])
147

    
148
      (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
149
      self.assertEqual(code, http.HttpUnauthorized.code)
150

    
151
  @staticmethod
152
  def _MakeAuthHeaders(username, password, correct_password):
153
    if correct_password:
154
      pw = password
155
    else:
156
      pw = "wrongpass"
157

    
158
    return rapi.testutils._FormatHeaders([
159
      "%s: Basic %s" % (http.HTTP_AUTHORIZATION,
160
                        base64.b64encode("%s:%s" % (username, pw))),
161
      "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
162
      ])
163

    
164
  def testQueryAuth(self):
165
    username = "admin"
166
    password = "2046920054"
167

    
168
    header_fn = compat.partial(self._MakeAuthHeaders, username, password)
169

    
170
    def _LookupUserNoWrite(name):
171
      if name == username:
172
        return http.auth.PasswordFileUser(name, password, [])
173
      else:
174
        return None
175

    
176
    for access in [rapi.RAPI_ACCESS_WRITE, rapi.RAPI_ACCESS_READ]:
177
      def _LookupUserWithWrite(name):
178
        if name == username:
179
          return http.auth.PasswordFileUser(name, password, [
180
            access,
181
            ])
182
        else:
183
          return None
184

    
185
      for qr in constants.QR_VIA_RAPI:
186
        # The /2/query resource has somewhat special rules for authentication as
187
        # it can be used to retrieve critical information
188
        path = "/2/query/%s" % qr
189

    
190
        for method in rapi.baserlib._SUPPORTED_METHODS:
191
          # No authorization
192
          (code, _, _) = self._Test(method, path, "", "")
193

    
194
          if method in (http.HTTP_DELETE, http.HTTP_POST):
195
            self.assertEqual(code, http.HttpNotImplemented.code)
196
            continue
197

    
198
          self.assertEqual(code, http.HttpUnauthorized.code)
199

    
200
          # Incorrect user
201
          (code, _, _) = self._Test(method, path, header_fn(True), "",
202
                                    user_fn=self._LookupWrongUser)
203
          self.assertEqual(code, http.HttpUnauthorized.code)
204

    
205
          # User has no write access, but the password is correct
206
          (code, _, _) = self._Test(method, path, header_fn(True), "",
207
                                    user_fn=_LookupUserNoWrite)
208
          self.assertEqual(code, http.HttpForbidden.code)
209

    
210
          # Wrong password and no write access
211
          (code, _, _) = self._Test(method, path, header_fn(False), "",
212
                                    user_fn=_LookupUserNoWrite)
213
          self.assertEqual(code, http.HttpUnauthorized.code)
214

    
215
          # Wrong password with write access
216
          (code, _, _) = self._Test(method, path, header_fn(False), "",
217
                                    user_fn=_LookupUserWithWrite)
218
          self.assertEqual(code, http.HttpUnauthorized.code)
219

    
220
          # Prepare request information
221
          if method == http.HTTP_PUT:
222
            reqpath = path
223
            body = serializer.DumpJson({
224
              "fields": ["name"],
225
              })
226
          elif method == http.HTTP_GET:
227
            reqpath = "%s?fields=name" % path
228
            body = ""
229
          else:
230
            self.fail("Unknown method '%s'" % method)
231

    
232
          # User has write access, password is correct
233
          (code, _, data) = self._Test(method, reqpath, header_fn(True), body,
234
                                       user_fn=_LookupUserWithWrite,
235
                                       luxi_client=_FakeLuxiClientForQuery)
236
          self.assertEqual(code, http.HTTP_OK)
237
          self.assertTrue(objects.QueryResponse.FromDict(data))
238

    
239
  def testConsole(self):
240
    path = "/2/instances/inst1.example.com/console"
241

    
242
    for method in rapi.baserlib._SUPPORTED_METHODS:
243
      for reqauth in [False, True]:
244
        # No authorization
245
        (code, _, _) = self._Test(method, path, "", "", reqauth=reqauth)
246

    
247
        if method == http.HTTP_GET or reqauth:
248
          self.assertEqual(code, http.HttpUnauthorized.code)
249
        else:
250
          self.assertEqual(code, http.HttpNotImplemented.code)
251

    
252

    
253
class _FakeLuxiClientForQuery:
254
  def __init__(self, *args, **kwargs):
255
    pass
256

    
257
  def Query(self, *args):
258
    return objects.QueryResponse(fields=[])
259

    
260

    
261
if __name__ == "__main__":
262
  testutils.GanetiTestProgram()