Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.server.rapi_unittest.py @ 105f0d47

History | View | Annotate | Download (8.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.server.rapi"""
23

    
24
import re
25
import unittest
26
import random
27
import mimetools
28
import base64
29
from cStringIO import StringIO
30

    
31
from ganeti import constants
32
from ganeti import utils
33
from ganeti import compat
34
from ganeti import errors
35
from ganeti import serializer
36
from ganeti import rapi
37
from ganeti import http
38
from ganeti import objects
39

    
40
import ganeti.rapi.baserlib
41
import ganeti.rapi.testutils
42
import ganeti.rapi.rlib2
43
import ganeti.http.auth
44

    
45
import testutils
46

    
47

    
48
class TestRemoteApiHandler(unittest.TestCase):
49
  @staticmethod
50
  def _LookupWrongUser(_):
51
    return None
52

    
53
  def _Test(self, method, path, headers, reqbody,
54
            user_fn=NotImplemented, luxi_client=NotImplemented):
55
    rm = rapi.testutils._RapiMock(user_fn, luxi_client)
56

    
57
    (resp_code, resp_headers, resp_body) = \
58
      rm.FetchResponse(path, method, http.ParseHeaders(StringIO(headers)),
59
                       reqbody)
60

    
61
    self.assertTrue(resp_headers[http.HTTP_DATE])
62
    self.assertEqual(resp_headers[http.HTTP_CONNECTION], "close")
63
    self.assertEqual(resp_headers[http.HTTP_CONTENT_TYPE], http.HTTP_APP_JSON)
64
    self.assertEqual(resp_headers[http.HTTP_SERVER], http.HTTP_GANETI_VERSION)
65

    
66
    return (resp_code, resp_headers, serializer.LoadJson(resp_body))
67

    
68
  def testRoot(self):
69
    (code, _, data) = self._Test(http.HTTP_GET, "/", "", None)
70
    self.assertEqual(code, http.HTTP_OK)
71
    self.assertTrue(data is None)
72

    
73
  def testVersion(self):
74
    (code, _, data) = self._Test(http.HTTP_GET, "/version", "", None)
75
    self.assertEqual(code, http.HTTP_OK)
76
    self.assertEqual(data, constants.RAPI_VERSION)
77

    
78
  def testSlashTwo(self):
79
    (code, _, data) = self._Test(http.HTTP_GET, "/2", "", None)
80
    self.assertEqual(code, http.HTTP_OK)
81
    self.assertTrue(data is None)
82

    
83
  def testFeatures(self):
84
    (code, _, data) = self._Test(http.HTTP_GET, "/2/features", "", None)
85
    self.assertEqual(code, http.HTTP_OK)
86
    self.assertEqual(set(data), set(rapi.rlib2.ALL_FEATURES))
87

    
88
  def testPutInstances(self):
89
    (code, _, data) = self._Test(http.HTTP_PUT, "/2/instances", "", None)
90
    self.assertEqual(code, http.HttpNotImplemented.code)
91
    self.assertTrue(data["message"].startswith("Method PUT is unsupported"))
92

    
93
  def testPostInstancesNoAuth(self):
94
    (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", "", None)
95
    self.assertEqual(code, http.HttpUnauthorized.code)
96

    
97
  def testRequestWithUnsupportedMediaType(self):
98
    for fn in [lambda s: s, lambda s: s.upper(), lambda s: s.title()]:
99
      headers = rapi.testutils._FormatHeaders([
100
        "%s: %s" % (http.HTTP_CONTENT_TYPE, fn("un/supported/media/type")),
101
        ])
102
      (code, _, data) = self._Test(http.HTTP_GET, "/", headers, "body")
103
      self.assertEqual(code, http.HttpUnsupportedMediaType.code)
104
      self.assertEqual(data["message"], "Unsupported Media Type")
105

    
106
  def testRequestWithInvalidJsonData(self):
107
    body = "_this/is/no'valid.json"
108
    self.assertRaises(Exception, serializer.LoadJson, body)
109

    
110
    headers = rapi.testutils._FormatHeaders([
111
      "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
112
      ])
113

    
114
    (code, _, data) = self._Test(http.HTTP_GET, "/", headers, body)
115
    self.assertEqual(code, http.HttpBadRequest.code)
116
    self.assertEqual(data["message"], "Unable to parse JSON data")
117

    
118
  def testUnsupportedAuthScheme(self):
119
    headers = rapi.testutils._FormatHeaders([
120
      "%s: %s" % (http.HTTP_AUTHORIZATION, "Unsupported scheme"),
121
      ])
122

    
123
    (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
124
    self.assertEqual(code, http.HttpUnauthorized.code)
125

    
126
  def testIncompleteBasicAuth(self):
127
    headers = rapi.testutils._FormatHeaders([
128
      "%s: Basic" % http.HTTP_AUTHORIZATION,
129
      ])
130

    
131
    (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
132
    self.assertEqual(code, http.HttpBadRequest.code)
133
    self.assertEqual(data["message"],
134
                     "Basic authentication requires credentials")
135

    
136
  def testInvalidBasicAuth(self):
137
    for auth in ["!invalid=base!64.", base64.b64encode(" "),
138
                 base64.b64encode("missingcolonchar")]:
139
      headers = rapi.testutils._FormatHeaders([
140
        "%s: Basic %s" % (http.HTTP_AUTHORIZATION, auth),
141
        ])
142

    
143
      (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
144
      self.assertEqual(code, http.HttpUnauthorized.code)
145

    
146
  @staticmethod
147
  def _MakeAuthHeaders(username, password, correct_password):
148
    if correct_password:
149
      pw = password
150
    else:
151
      pw = "wrongpass"
152

    
153
    return rapi.testutils._FormatHeaders([
154
      "%s: Basic %s" % (http.HTTP_AUTHORIZATION,
155
                        base64.b64encode("%s:%s" % (username, pw))),
156
      "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
157
      ])
158

    
159
  def testQueryAuth(self):
160
    username = "admin"
161
    password = "2046920054"
162

    
163
    header_fn = compat.partial(self._MakeAuthHeaders, username, password)
164

    
165
    def _LookupUserNoWrite(name):
166
      if name == username:
167
        return http.auth.PasswordFileUser(name, password, [])
168
      else:
169
        return None
170

    
171
    def _LookupUserWithWrite(name):
172
      if name == username:
173
        return http.auth.PasswordFileUser(name, password, [
174
          rapi.RAPI_ACCESS_WRITE,
175
          ])
176
      else:
177
        return None
178

    
179
    for qr in constants.QR_VIA_RAPI:
180
      # The /2/query resource has somewhat special rules for authentication as
181
      # it can be used to retrieve critical information
182
      path = "/2/query/%s" % qr
183

    
184
      for method in rapi.baserlib._SUPPORTED_METHODS:
185
        # No authorization
186
        (code, _, _) = self._Test(method, path, "", "")
187

    
188
        if method in (http.HTTP_DELETE, http.HTTP_POST):
189
          self.assertEqual(code, http.HttpNotImplemented.code)
190
          continue
191

    
192
        self.assertEqual(code, http.HttpUnauthorized.code)
193

    
194
        # Incorrect user
195
        (code, _, _) = self._Test(method, path, header_fn(True), "",
196
                                  user_fn=self._LookupWrongUser)
197
        self.assertEqual(code, http.HttpUnauthorized.code)
198

    
199
        # User has no write access, but the password is correct
200
        (code, _, _) = self._Test(method, path, header_fn(True), "",
201
                                  user_fn=_LookupUserNoWrite)
202
        self.assertEqual(code, http.HttpForbidden.code)
203

    
204
        # Wrong password and no write access
205
        (code, _, _) = self._Test(method, path, header_fn(False), "",
206
                                  user_fn=_LookupUserNoWrite)
207
        self.assertEqual(code, http.HttpUnauthorized.code)
208

    
209
        # Wrong password with write access
210
        (code, _, _) = self._Test(method, path, header_fn(False), "",
211
                                  user_fn=_LookupUserWithWrite)
212
        self.assertEqual(code, http.HttpUnauthorized.code)
213

    
214
        # Prepare request information
215
        if method == http.HTTP_PUT:
216
          reqpath = path
217
          body = serializer.DumpJson({
218
            "fields": ["name"],
219
            })
220
        elif method == http.HTTP_GET:
221
          reqpath = "%s?fields=name" % path
222
          body = ""
223
        else:
224
          self.fail("Unknown method '%s'" % method)
225

    
226
        # User has write access, password is correct
227
        (code, _, data) = self._Test(method, reqpath, header_fn(True), body,
228
                                     user_fn=_LookupUserWithWrite,
229
                                     luxi_client=_FakeLuxiClientForQuery)
230
        self.assertEqual(code, http.HTTP_OK)
231
        self.assertTrue(objects.QueryResponse.FromDict(data))
232

    
233

    
234
class _FakeLuxiClientForQuery:
235
  def __init__(self, *args, **kwargs):
236
    pass
237

    
238
  def Query(self, *args):
239
    return objects.QueryResponse(fields=[])
240

    
241

    
242
if __name__ == "__main__":
243
  testutils.GanetiTestProgram()