Revision 13bf6cd8 snf-pithos-app/pithos/api/test/views.py
b/snf-pithos-app/pithos/api/test/views.py | ||
---|---|---|
42 | 42 |
from synnefo.lib.services import get_service_path |
43 | 43 |
from synnefo.lib import join_urls |
44 | 44 |
|
45 |
from mock import patch |
|
45 |
#from mock import patch
|
|
46 | 46 |
from urllib import quote |
47 |
from urlparse import urlsplit, parse_qs |
|
48 | 47 |
|
49 | 48 |
import django.utils.simplejson as json |
49 |
from django.core.urlresolvers import reverse |
|
50 | 50 |
|
51 | 51 |
import re |
52 | 52 |
import datetime |
53 | 53 |
import time as _time |
54 | 54 |
import random |
55 |
import urllib |
|
56 |
import urlparse |
|
55 | 57 |
|
56 | 58 |
|
57 |
class NotAllowedView(PithosAPITest): |
|
58 |
def head(self, url, *args, **kwargs): |
|
59 |
with patch("pithos.api.util.get_token_from_cookie") as m: |
|
60 |
m.return_value = 'token' |
|
61 |
return super(NotAllowedView, self).head(url, *args, **kwargs) |
|
62 |
|
|
63 |
def delete(self, url, *args, **kwargs): |
|
64 |
with patch("pithos.api.util.get_token_from_cookie") as m: |
|
65 |
m.return_value = 'token' |
|
66 |
return super(NotAllowedView, self).delete(url, *args, **kwargs) |
|
67 |
|
|
68 |
def post(self, url, *args, **kwargs): |
|
69 |
with patch("pithos.api.util.get_token_from_cookie") as m: |
|
70 |
m.return_value = 'token' |
|
71 |
return super(NotAllowedView, self).post(url, *args, **kwargs) |
|
72 |
|
|
73 |
def put(self, url, *args, **kwargs): |
|
74 |
with patch("pithos.api.util.get_token_from_cookie") as m: |
|
75 |
m.return_value = 'token' |
|
76 |
return super(NotAllowedView, self).put(url, *args, **kwargs) |
|
77 |
|
|
78 |
def copy(self, url, *args, **kwargs): |
|
79 |
with patch("pithos.api.util.get_token_from_cookie") as m: |
|
80 |
m.return_value = 'token' |
|
81 |
return super(NotAllowedView, self).copy(url, *args, **kwargs) |
|
82 |
|
|
83 |
def move(self, url, *args, **kwargs): |
|
84 |
with patch("pithos.api.util.get_token_from_cookie") as m: |
|
85 |
m.return_value = 'token' |
|
86 |
return super(NotAllowedView, self).move(url, *args, **kwargs) |
|
59 |
def add_url_params(url, **kwargs): |
|
60 |
if not kwargs: |
|
61 |
return url |
|
62 |
parts = list(urlparse.urlsplit(url)) |
|
63 |
params = dict(urlparse.parse_qsl(parts[3], keep_blank_values=True)) |
|
64 |
params.update(kwargs) |
|
65 |
parts[3] = urllib.urlencode(params) |
|
66 |
return urlparse.urlunsplit(parts) |
|
67 |
|
|
87 | 68 |
|
69 |
class NotAllowedView(PithosAPITest): |
|
88 | 70 |
def test_not_allowed(self): |
89 | 71 |
self.view_path = join_urls(get_service_path( |
90 | 72 |
pithos_settings.pithos_services, 'pithos_ui'), 'view') |
... | ... | |
93 | 75 |
|
94 | 76 |
r = self.head(self.view_url) |
95 | 77 |
self.assertEqual(r.status_code, 405) |
96 |
self.assertTrue('Allow' in r) |
|
97 |
self.assertEqual(r['Allow'], 'GET') |
|
98 | 78 |
|
99 | 79 |
r = self.delete(self.view_url) |
100 | 80 |
self.assertEqual(r.status_code, 405) |
... | ... | |
135 | 115 |
self.api_url = join_urls(self.pithos_path, self.user, self.cname, |
136 | 116 |
self.oname) |
137 | 117 |
|
138 |
def get(self, url, user='user', *args, **kwargs): |
|
139 |
with patch("pithos.api.util.get_token_from_cookie") as m: |
|
140 |
m.return_value = 'token' |
|
141 |
return super(ObjectGetView, self).get(url, user='user', *args, |
|
142 |
**kwargs) |
|
118 |
def view(self, url, access_token='valid_token', user='user', *args, |
|
119 |
**kwargs): |
|
120 |
|
|
121 |
params = {} |
|
122 |
if access_token is not None: |
|
123 |
params['access_token'] = access_token |
|
124 |
return self.get(add_url_params(url, **params), user, *args, |
|
125 |
**kwargs) |
|
143 | 126 |
|
144 |
def test_no_cookie_redirect(self):
|
|
145 |
r = super(ObjectGetView, self).get(self.view_url)
|
|
127 |
def test_no_authorization_granted(self):
|
|
128 |
r = self.get(self.view_url)
|
|
146 | 129 |
self.assertEqual(r.status_code, 302) |
147 | 130 |
self.assertTrue('Location' in r) |
148 |
parts = list(urlsplit(r['Location'])) |
|
149 |
qs = parse_qs(parts[3]) |
|
150 |
self.assertTrue('next' in qs) |
|
151 |
self.assertEqual(qs['next'][0], join_urls(pithos_settings.BASE_HOST, |
|
152 |
self.view_url)) |
|
131 |
p = urlparse.urlparse(r['Location']) |
|
132 |
self.assertEqual(p.netloc, 'testserver') |
|
133 |
self.assertEqual(p.path, reverse('oa2_authenticate')) |
|
134 |
|
|
135 |
r = self.get(add_url_params(self.view_url, code='valid_code'), |
|
136 |
follow=True) |
|
137 |
self.assertEqual(r.status_code, 200) |
|
138 |
self.assertTrue(r.content, self.odata) |
|
139 |
intermidiate_url = r.redirect_chain[0][0] |
|
140 |
p = urlparse.urlparse(intermidiate_url) |
|
141 |
params = urlparse.parse_qs(p.query) |
|
142 |
self.assertTrue('access_token' in params) |
|
143 |
|
|
144 |
r = self.get(add_url_params(self.view_url, access_token='valid_token')) |
|
145 |
self.assertEqual(r.status_code, 200) |
|
146 |
self.assertTrue(r.content, self.odata) |
|
147 |
|
|
148 |
def test_forbidden(self): |
|
149 |
container = self.create_container(user='alice')[0] |
|
150 |
obj = self.upload_object(container, user='alice')[0] |
|
151 |
|
|
152 |
url = join_urls(self.view_path, 'alice', container, obj) |
|
153 |
r = self.view(url) |
|
154 |
self.assertEqual(r.status_code, 403) |
|
155 |
|
|
156 |
def test_shared_with_me(self): |
|
157 |
container = self.create_container(user='alice')[0] |
|
158 |
obj, data = self.upload_object(container, user='alice')[:-1] |
|
159 |
|
|
160 |
# share object |
|
161 |
url = join_urls(self.pithos_path, 'alice', container, obj) |
|
162 |
self.post(url, user='alice', content_type='', |
|
163 |
HTTP_CONTENT_RANGE='bytes */*', |
|
164 |
HTTP_X_OBJECT_SHARING='read=user') |
|
165 |
|
|
166 |
url = join_urls(self.view_path, 'alice', container, obj) |
|
167 |
r = self.view(url) |
|
168 |
self.assertEqual(r.status_code, 200) |
|
169 |
self.assertEqual(r.content, data) |
|
170 |
|
|
171 |
def test_view(self): |
|
172 |
r = self.view(self.view_url) |
|
173 |
self.assertEqual(r.status_code, 200) |
|
174 |
self.assertEqual(r.content, self.odata) |
|
175 |
|
|
176 |
def test_not_existing(self): |
|
177 |
url = self.view_url[:-1] |
|
178 |
r = self.view(url) |
|
179 |
self.assertEqual(r.status_code, 404) |
|
153 | 180 |
|
154 | 181 |
def test_versions(self): |
155 | 182 |
c = self.cname |
... | ... | |
159 | 186 |
r = self.post(self.api_url, content_type='', **meta) |
160 | 187 |
self.assertEqual(r.status_code, 202) |
161 | 188 |
|
162 |
r = self.get('%s?version=list&format=json' % self.view_url)
|
|
189 |
r = self.view('%s?version=list&format=json' % self.view_url)
|
|
163 | 190 |
self.assertEqual(r.status_code, 200) |
164 | 191 |
l1 = json.loads(r.content)['versions'] |
165 | 192 |
self.assertEqual(len(l1), 2) |
... | ... | |
171 | 198 |
self.assertEqual(r.status_code, 202) |
172 | 199 |
|
173 | 200 |
# assert a newly created version has been created |
174 |
r = self.get('%s?version=list&format=json' % self.view_url)
|
|
201 |
r = self.view('%s?version=list&format=json' % self.view_url)
|
|
175 | 202 |
self.assertEqual(r.status_code, 200) |
176 | 203 |
l2 = json.loads(r.content)['versions'] |
177 | 204 |
self.assertEqual(len(l2), len(l1) + 1) |
... | ... | |
185 | 212 |
self.append_object_data(c, o) |
186 | 213 |
|
187 | 214 |
# assert a newly created version has been created |
188 |
r = self.get('%s?version=list&format=json' % self.view_url)
|
|
215 |
r = self.view('%s?version=list&format=json' % self.view_url)
|
|
189 | 216 |
self.assertEqual(r.status_code, 200) |
190 | 217 |
l3 = json.loads(r.content)['versions'] |
191 | 218 |
self.assertEqual(len(l3), len(l2) + 1) |
... | ... | |
194 | 221 |
def test_objects_with_trailing_spaces(self): |
195 | 222 |
cname = self.cname |
196 | 223 |
|
197 |
r = self.get(quote('%s ' % self.view_url))
|
|
224 |
r = self.view(quote('%s ' % self.view_url))
|
|
198 | 225 |
self.assertEqual(r.status_code, 404) |
199 | 226 |
|
200 | 227 |
# delete object |
201 | 228 |
self.delete(self.api_url) |
202 | 229 |
|
203 |
r = self.get(self.view_url)
|
|
230 |
r = self.view(self.view_url)
|
|
204 | 231 |
self.assertEqual(r.status_code, 404) |
205 | 232 |
|
206 | 233 |
# upload object with trailing space |
207 | 234 |
oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0] |
208 | 235 |
|
209 | 236 |
view_url = join_urls(self.view_path, self.user, cname, oname) |
210 |
r = self.get(view_url)
|
|
237 |
r = self.view(view_url)
|
|
211 | 238 |
self.assertEqual(r.status_code, 200) |
212 | 239 |
|
213 | 240 |
view_url = join_urls(self.view_path, self.user, cname, oname[:-1]) |
214 |
r = self.get(view_url)
|
|
241 |
r = self.view(view_url)
|
|
215 | 242 |
self.assertEqual(r.status_code, 404) |
216 | 243 |
|
217 | 244 |
def test_get_partial(self): |
218 | 245 |
limit = pithos_settings.BACKEND_BLOCK_SIZE + 1 |
219 |
r = self.get(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
|
|
246 |
r = self.view(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
|
|
220 | 247 |
self.assertEqual(r.status_code, 206) |
221 | 248 |
self.assertEqual(r.content, self.odata[:limit + 1]) |
222 | 249 |
self.assertTrue('Content-Range' in r) |
... | ... | |
227 | 254 |
|
228 | 255 |
def test_get_range_not_satisfiable(self): |
229 | 256 |
# TODO |
230 |
#r = self.get(self.view_url, HTTP_RANGE='bytes=50-10')
|
|
257 |
#r = self.view(self.view_url, HTTP_RANGE='bytes=50-10')
|
|
231 | 258 |
#self.assertEqual(r.status_code, 416) |
232 | 259 |
|
233 | 260 |
offset = len(self.odata) + 1 |
234 |
r = self.get(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
|
|
261 |
r = self.view(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
|
|
235 | 262 |
self.assertEqual(r.status_code, 416) |
236 | 263 |
|
237 | 264 |
def test_multiple_range(self): |
238 | 265 |
l = ['0-499', '-500', '1000-'] |
239 | 266 |
ranges = 'bytes=%s' % ','.join(l) |
240 |
r = self.get(self.view_url, HTTP_RANGE=ranges)
|
|
267 |
r = self.view(self.view_url, HTTP_RANGE=ranges)
|
|
241 | 268 |
self.assertEqual(r.status_code, 206) |
242 | 269 |
self.assertTrue('content-type' in r) |
243 | 270 |
p = re.compile( |
... | ... | |
283 | 310 |
out_of_range = len(self.odata) + 1 |
284 | 311 |
l = ['0-499', '-500', '%d-' % out_of_range] |
285 | 312 |
ranges = 'bytes=%s' % ','.join(l) |
286 |
r = self.get(self.view_url, HTTP_RANGE=ranges)
|
|
313 |
r = self.view(self.view_url, HTTP_RANGE=ranges)
|
|
287 | 314 |
self.assertEqual(r.status_code, 416) |
288 | 315 |
|
289 | 316 |
def test_get_if_match(self): |
... | ... | |
292 | 319 |
else: |
293 | 320 |
etag = merkle(self.odata) |
294 | 321 |
|
295 |
r = self.get(self.view_url, HTTP_IF_MATCH=etag)
|
|
322 |
r = self.view(self.view_url, HTTP_IF_MATCH=etag)
|
|
296 | 323 |
|
297 | 324 |
# assert get success |
298 | 325 |
self.assertEqual(r.status_code, 200) |
... | ... | |
301 | 328 |
self.assertEqual(r.content, self.odata) |
302 | 329 |
|
303 | 330 |
def test_get_if_match_star(self): |
304 |
r = self.get(self.view_url, HTTP_IF_MATCH='*')
|
|
331 |
r = self.view(self.view_url, HTTP_IF_MATCH='*')
|
|
305 | 332 |
|
306 | 333 |
# assert get success |
307 | 334 |
self.assertEqual(r.status_code, 200) |
... | ... | |
316 | 343 |
etag = merkle(self.odata) |
317 | 344 |
|
318 | 345 |
quoted = lambda s: '"%s"' % s |
319 |
r = self.get(self.view_url, HTTP_IF_MATCH=','.join(
|
|
346 |
r = self.view(self.view_url, HTTP_IF_MATCH=','.join(
|
|
320 | 347 |
[quoted(etag), quoted(get_random_data(64))])) |
321 | 348 |
|
322 | 349 |
# assert get success |
... | ... | |
326 | 353 |
self.assertEqual(r.content, self.odata) |
327 | 354 |
|
328 | 355 |
def test_if_match_precondition_failed(self): |
329 |
r = self.get(self.view_url, HTTP_IF_MATCH=get_random_name())
|
|
356 |
r = self.view(self.view_url, HTTP_IF_MATCH=get_random_name())
|
|
330 | 357 |
self.assertEqual(r.status_code, 412) |
331 | 358 |
|
332 | 359 |
def test_if_none_match(self): |
... | ... | |
336 | 363 |
etag = merkle(self.odata) |
337 | 364 |
|
338 | 365 |
# perform get with If-None-Match |
339 |
r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
|
|
366 |
r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
|
|
340 | 367 |
|
341 | 368 |
# assert precondition_failed |
342 | 369 |
self.assertEqual(r.status_code, 304) |
... | ... | |
346 | 373 |
self.assertTrue(etag != r['ETag']) |
347 | 374 |
|
348 | 375 |
# perform get with If-None-Match |
349 |
r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
|
|
376 |
r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
|
|
350 | 377 |
|
351 | 378 |
# assert get success |
352 | 379 |
self.assertEqual(r.status_code, 200) |
353 | 380 |
|
354 | 381 |
def test_if_none_match_star(self): |
355 | 382 |
# perform get with If-None-Match with star |
356 |
r = self.get(self.view_url, HTTP_IF_NONE_MATCH='*')
|
|
383 |
r = self.view(self.view_url, HTTP_IF_NONE_MATCH='*')
|
|
357 | 384 |
self.assertEqual(r.status_code, 304) |
358 | 385 |
|
359 | 386 |
def test_if_modified_since(self): |
... | ... | |
365 | 392 |
|
366 | 393 |
# Check not modified since |
367 | 394 |
for t in t1_formats: |
368 |
r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
|
|
395 |
r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
|
|
369 | 396 |
self.assertEqual(r.status_code, 304) |
370 | 397 |
|
371 | 398 |
_time.sleep(1) |
... | ... | |
375 | 402 |
|
376 | 403 |
# Check modified since |
377 | 404 |
for t in t1_formats: |
378 |
r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
|
|
405 |
r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
|
|
379 | 406 |
self.assertEqual(r.status_code, 200) |
380 | 407 |
self.assertEqual(r.content, self.odata + appended_data) |
381 | 408 |
|
382 | 409 |
def test_if_modified_since_invalid_date(self): |
383 |
r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
|
|
410 |
r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
|
|
384 | 411 |
self.assertEqual(r.status_code, 200) |
385 | 412 |
self.assertEqual(r.content, self.odata) |
386 | 413 |
|
... | ... | |
393 | 420 |
t1 = t + datetime.timedelta(seconds=1) |
394 | 421 |
t1_formats = map(t1.strftime, DATE_FORMATS) |
395 | 422 |
for t in t1_formats: |
396 |
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
|
|
423 |
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
|
|
397 | 424 |
self.assertEqual(r.status_code, 200) |
398 | 425 |
self.assertEqual(r.content, self.odata) |
399 | 426 |
|
... | ... | |
409 | 436 |
|
410 | 437 |
# check modified |
411 | 438 |
for t in t2_formats: |
412 |
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
|
|
439 |
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
|
|
413 | 440 |
self.assertEqual(r.status_code, 412) |
414 | 441 |
|
415 | 442 |
# modify account: update object meta |
... | ... | |
424 | 451 |
|
425 | 452 |
# check modified |
426 | 453 |
for t in t3_formats: |
427 |
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
|
|
454 |
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
|
|
428 | 455 |
self.assertEqual(r.status_code, 412) |
429 | 456 |
|
430 | 457 |
def test_if_unmodified_since(self): |
... | ... | |
435 | 462 |
t_formats = map(t.strftime, DATE_FORMATS) |
436 | 463 |
|
437 | 464 |
for tf in t_formats: |
438 |
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
|
|
465 |
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
|
|
439 | 466 |
self.assertEqual(r.status_code, 200) |
440 | 467 |
self.assertEqual(r.content, self.odata) |
441 | 468 |
|
... | ... | |
447 | 474 |
t_formats = map(t.strftime, DATE_FORMATS) |
448 | 475 |
|
449 | 476 |
for tf in t_formats: |
450 |
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
|
|
477 |
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
|
|
451 | 478 |
self.assertEqual(r.status_code, 412) |
452 | 479 |
|
453 | 480 |
def test_hashes(self): |
... | ... | |
456 | 483 |
size = len(odata) |
457 | 484 |
|
458 | 485 |
view_url = join_urls(self.view_path, self.user, self.cname, oname) |
459 |
r = self.get('%s?format=json&hashmap' % view_url)
|
|
486 |
r = self.view('%s?format=json&hashmap' % view_url)
|
|
460 | 487 |
self.assertEqual(r.status_code, 200) |
461 | 488 |
body = json.loads(r.content) |
462 | 489 |
|
Also available in: Unified diff