Revision 105f0d47

b/Makefile.am
937 937
	test/ganeti.rpc_unittest.py \
938 938
	test/ganeti.runtime_unittest.py \
939 939
	test/ganeti.serializer_unittest.py \
940
	test/ganeti.server.rapi_unittest.py \
940 941
	test/ganeti.ssh_unittest.py \
941 942
	test/ganeti.storage_unittest.py \
942 943
	test/ganeti.tools.ensure_dirs_unittest.py \
b/test/ganeti.server.rapi_unittest.py
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Script for testing ganeti.server.rapi"""
23

  
24
import re
25
import unittest
26
import random
27
import mimetools
28
import base64
29
from cStringIO import StringIO
30

  
31
from ganeti import constants
32
from ganeti import utils
33
from ganeti import compat
34
from ganeti import errors
35
from ganeti import serializer
36
from ganeti import rapi
37
from ganeti import http
38
from ganeti import objects
39

  
40
import ganeti.rapi.baserlib
41
import ganeti.rapi.testutils
42
import ganeti.rapi.rlib2
43
import ganeti.http.auth
44

  
45
import testutils
46

  
47

  
48
class TestRemoteApiHandler(unittest.TestCase):
49
  @staticmethod
50
  def _LookupWrongUser(_):
51
    return None
52

  
53
  def _Test(self, method, path, headers, reqbody,
54
            user_fn=NotImplemented, luxi_client=NotImplemented):
55
    rm = rapi.testutils._RapiMock(user_fn, luxi_client)
56

  
57
    (resp_code, resp_headers, resp_body) = \
58
      rm.FetchResponse(path, method, http.ParseHeaders(StringIO(headers)),
59
                       reqbody)
60

  
61
    self.assertTrue(resp_headers[http.HTTP_DATE])
62
    self.assertEqual(resp_headers[http.HTTP_CONNECTION], "close")
63
    self.assertEqual(resp_headers[http.HTTP_CONTENT_TYPE], http.HTTP_APP_JSON)
64
    self.assertEqual(resp_headers[http.HTTP_SERVER], http.HTTP_GANETI_VERSION)
65

  
66
    return (resp_code, resp_headers, serializer.LoadJson(resp_body))
67

  
68
  def testRoot(self):
69
    (code, _, data) = self._Test(http.HTTP_GET, "/", "", None)
70
    self.assertEqual(code, http.HTTP_OK)
71
    self.assertTrue(data is None)
72

  
73
  def testVersion(self):
74
    (code, _, data) = self._Test(http.HTTP_GET, "/version", "", None)
75
    self.assertEqual(code, http.HTTP_OK)
76
    self.assertEqual(data, constants.RAPI_VERSION)
77

  
78
  def testSlashTwo(self):
79
    (code, _, data) = self._Test(http.HTTP_GET, "/2", "", None)
80
    self.assertEqual(code, http.HTTP_OK)
81
    self.assertTrue(data is None)
82

  
83
  def testFeatures(self):
84
    (code, _, data) = self._Test(http.HTTP_GET, "/2/features", "", None)
85
    self.assertEqual(code, http.HTTP_OK)
86
    self.assertEqual(set(data), set(rapi.rlib2.ALL_FEATURES))
87

  
88
  def testPutInstances(self):
89
    (code, _, data) = self._Test(http.HTTP_PUT, "/2/instances", "", None)
90
    self.assertEqual(code, http.HttpNotImplemented.code)
91
    self.assertTrue(data["message"].startswith("Method PUT is unsupported"))
92

  
93
  def testPostInstancesNoAuth(self):
94
    (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", "", None)
95
    self.assertEqual(code, http.HttpUnauthorized.code)
96

  
97
  def testRequestWithUnsupportedMediaType(self):
98
    for fn in [lambda s: s, lambda s: s.upper(), lambda s: s.title()]:
99
      headers = rapi.testutils._FormatHeaders([
100
        "%s: %s" % (http.HTTP_CONTENT_TYPE, fn("un/supported/media/type")),
101
        ])
102
      (code, _, data) = self._Test(http.HTTP_GET, "/", headers, "body")
103
      self.assertEqual(code, http.HttpUnsupportedMediaType.code)
104
      self.assertEqual(data["message"], "Unsupported Media Type")
105

  
106
  def testRequestWithInvalidJsonData(self):
107
    body = "_this/is/no'valid.json"
108
    self.assertRaises(Exception, serializer.LoadJson, body)
109

  
110
    headers = rapi.testutils._FormatHeaders([
111
      "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
112
      ])
113

  
114
    (code, _, data) = self._Test(http.HTTP_GET, "/", headers, body)
115
    self.assertEqual(code, http.HttpBadRequest.code)
116
    self.assertEqual(data["message"], "Unable to parse JSON data")
117

  
118
  def testUnsupportedAuthScheme(self):
119
    headers = rapi.testutils._FormatHeaders([
120
      "%s: %s" % (http.HTTP_AUTHORIZATION, "Unsupported scheme"),
121
      ])
122

  
123
    (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
124
    self.assertEqual(code, http.HttpUnauthorized.code)
125

  
126
  def testIncompleteBasicAuth(self):
127
    headers = rapi.testutils._FormatHeaders([
128
      "%s: Basic" % http.HTTP_AUTHORIZATION,
129
      ])
130

  
131
    (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
132
    self.assertEqual(code, http.HttpBadRequest.code)
133
    self.assertEqual(data["message"],
134
                     "Basic authentication requires credentials")
135

  
136
  def testInvalidBasicAuth(self):
137
    for auth in ["!invalid=base!64.", base64.b64encode(" "),
138
                 base64.b64encode("missingcolonchar")]:
139
      headers = rapi.testutils._FormatHeaders([
140
        "%s: Basic %s" % (http.HTTP_AUTHORIZATION, auth),
141
        ])
142

  
143
      (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "")
144
      self.assertEqual(code, http.HttpUnauthorized.code)
145

  
146
  @staticmethod
147
  def _MakeAuthHeaders(username, password, correct_password):
148
    if correct_password:
149
      pw = password
150
    else:
151
      pw = "wrongpass"
152

  
153
    return rapi.testutils._FormatHeaders([
154
      "%s: Basic %s" % (http.HTTP_AUTHORIZATION,
155
                        base64.b64encode("%s:%s" % (username, pw))),
156
      "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON),
157
      ])
158

  
159
  def testQueryAuth(self):
160
    username = "admin"
161
    password = "2046920054"
162

  
163
    header_fn = compat.partial(self._MakeAuthHeaders, username, password)
164

  
165
    def _LookupUserNoWrite(name):
166
      if name == username:
167
        return http.auth.PasswordFileUser(name, password, [])
168
      else:
169
        return None
170

  
171
    def _LookupUserWithWrite(name):
172
      if name == username:
173
        return http.auth.PasswordFileUser(name, password, [
174
          rapi.RAPI_ACCESS_WRITE,
175
          ])
176
      else:
177
        return None
178

  
179
    for qr in constants.QR_VIA_RAPI:
180
      # The /2/query resource has somewhat special rules for authentication as
181
      # it can be used to retrieve critical information
182
      path = "/2/query/%s" % qr
183

  
184
      for method in rapi.baserlib._SUPPORTED_METHODS:
185
        # No authorization
186
        (code, _, _) = self._Test(method, path, "", "")
187

  
188
        if method in (http.HTTP_DELETE, http.HTTP_POST):
189
          self.assertEqual(code, http.HttpNotImplemented.code)
190
          continue
191

  
192
        self.assertEqual(code, http.HttpUnauthorized.code)
193

  
194
        # Incorrect user
195
        (code, _, _) = self._Test(method, path, header_fn(True), "",
196
                                  user_fn=self._LookupWrongUser)
197
        self.assertEqual(code, http.HttpUnauthorized.code)
198

  
199
        # User has no write access, but the password is correct
200
        (code, _, _) = self._Test(method, path, header_fn(True), "",
201
                                  user_fn=_LookupUserNoWrite)
202
        self.assertEqual(code, http.HttpForbidden.code)
203

  
204
        # Wrong password and no write access
205
        (code, _, _) = self._Test(method, path, header_fn(False), "",
206
                                  user_fn=_LookupUserNoWrite)
207
        self.assertEqual(code, http.HttpUnauthorized.code)
208

  
209
        # Wrong password with write access
210
        (code, _, _) = self._Test(method, path, header_fn(False), "",
211
                                  user_fn=_LookupUserWithWrite)
212
        self.assertEqual(code, http.HttpUnauthorized.code)
213

  
214
        # Prepare request information
215
        if method == http.HTTP_PUT:
216
          reqpath = path
217
          body = serializer.DumpJson({
218
            "fields": ["name"],
219
            })
220
        elif method == http.HTTP_GET:
221
          reqpath = "%s?fields=name" % path
222
          body = ""
223
        else:
224
          self.fail("Unknown method '%s'" % method)
225

  
226
        # User has write access, password is correct
227
        (code, _, data) = self._Test(method, reqpath, header_fn(True), body,
228
                                     user_fn=_LookupUserWithWrite,
229
                                     luxi_client=_FakeLuxiClientForQuery)
230
        self.assertEqual(code, http.HTTP_OK)
231
        self.assertTrue(objects.QueryResponse.FromDict(data))
232

  
233

  
234
class _FakeLuxiClientForQuery:
235
  def __init__(self, *args, **kwargs):
236
    pass
237

  
238
  def Query(self, *args):
239
    return objects.QueryResponse(fields=[])
240

  
241

  
242
if __name__ == "__main__":
243
  testutils.GanetiTestProgram()

Also available in: Unified diff