Revision 13bf6cd8 snf-pithos-app/pithos/api/test/views.py

b/snf-pithos-app/pithos/api/test/views.py
42 42
from synnefo.lib.services import get_service_path
43 43
from synnefo.lib import join_urls
44 44

  
45
from mock import patch
45
#from mock import patch
46 46
from urllib import quote
47
from urlparse import urlsplit, parse_qs
48 47

  
49 48
import django.utils.simplejson as json
49
from django.core.urlresolvers import reverse
50 50

  
51 51
import re
52 52
import datetime
53 53
import time as _time
54 54
import random
55
import urllib
56
import urlparse
55 57

  
56 58

  
57
class NotAllowedView(PithosAPITest):
58
    def head(self, url, *args, **kwargs):
59
        with patch("pithos.api.util.get_token_from_cookie") as m:
60
            m.return_value = 'token'
61
            return super(NotAllowedView, self).head(url, *args, **kwargs)
62

  
63
    def delete(self, url, *args, **kwargs):
64
        with patch("pithos.api.util.get_token_from_cookie") as m:
65
            m.return_value = 'token'
66
            return super(NotAllowedView, self).delete(url, *args, **kwargs)
67

  
68
    def post(self, url, *args, **kwargs):
69
        with patch("pithos.api.util.get_token_from_cookie") as m:
70
            m.return_value = 'token'
71
            return super(NotAllowedView, self).post(url, *args, **kwargs)
72

  
73
    def put(self, url, *args, **kwargs):
74
        with patch("pithos.api.util.get_token_from_cookie") as m:
75
            m.return_value = 'token'
76
            return super(NotAllowedView, self).put(url, *args, **kwargs)
77

  
78
    def copy(self, url, *args, **kwargs):
79
        with patch("pithos.api.util.get_token_from_cookie") as m:
80
            m.return_value = 'token'
81
            return super(NotAllowedView, self).copy(url, *args, **kwargs)
82

  
83
    def move(self, url, *args, **kwargs):
84
        with patch("pithos.api.util.get_token_from_cookie") as m:
85
            m.return_value = 'token'
86
            return super(NotAllowedView, self).move(url, *args, **kwargs)
59
def add_url_params(url, **kwargs):
60
    if not kwargs:
61
        return url
62
    parts = list(urlparse.urlsplit(url))
63
    params = dict(urlparse.parse_qsl(parts[3], keep_blank_values=True))
64
    params.update(kwargs)
65
    parts[3] = urllib.urlencode(params)
66
    return urlparse.urlunsplit(parts)
67

  
87 68

  
69
class NotAllowedView(PithosAPITest):
88 70
    def test_not_allowed(self):
89 71
        self.view_path = join_urls(get_service_path(
90 72
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
......
93 75

  
94 76
        r = self.head(self.view_url)
95 77
        self.assertEqual(r.status_code, 405)
96
        self.assertTrue('Allow' in r)
97
        self.assertEqual(r['Allow'],  'GET')
98 78

  
99 79
        r = self.delete(self.view_url)
100 80
        self.assertEqual(r.status_code, 405)
......
135 115
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
136 116
                                 self.oname)
137 117

  
138
    def get(self, url, user='user', *args, **kwargs):
139
        with patch("pithos.api.util.get_token_from_cookie") as m:
140
            m.return_value = 'token'
141
            return super(ObjectGetView, self).get(url, user='user', *args,
142
                                                  **kwargs)
118
    def view(self, url, access_token='valid_token', user='user', *args,
119
             **kwargs):
120

  
121
        params = {}
122
        if access_token is not None:
123
            params['access_token'] = access_token
124
        return self.get(add_url_params(url, **params), user, *args,
125
                        **kwargs)
143 126

  
144
    def test_no_cookie_redirect(self):
145
        r = super(ObjectGetView, self).get(self.view_url)
127
    def test_no_authorization_granted(self):
128
        r = self.get(self.view_url)
146 129
        self.assertEqual(r.status_code, 302)
147 130
        self.assertTrue('Location' in r)
148
        parts = list(urlsplit(r['Location']))
149
        qs = parse_qs(parts[3])
150
        self.assertTrue('next' in qs)
151
        self.assertEqual(qs['next'][0], join_urls(pithos_settings.BASE_HOST,
152
                                                  self.view_url))
131
        p = urlparse.urlparse(r['Location'])
132
        self.assertEqual(p.netloc, 'testserver')
133
        self.assertEqual(p.path, reverse('oa2_authenticate'))
134

  
135
        r = self.get(add_url_params(self.view_url, code='valid_code'),
136
                     follow=True)
137
        self.assertEqual(r.status_code, 200)
138
        self.assertTrue(r.content, self.odata)
139
        intermidiate_url = r.redirect_chain[0][0]
140
        p = urlparse.urlparse(intermidiate_url)
141
        params = urlparse.parse_qs(p.query)
142
        self.assertTrue('access_token' in params)
143

  
144
        r = self.get(add_url_params(self.view_url, access_token='valid_token'))
145
        self.assertEqual(r.status_code, 200)
146
        self.assertTrue(r.content, self.odata)
147

  
148
    def test_forbidden(self):
149
        container = self.create_container(user='alice')[0]
150
        obj = self.upload_object(container, user='alice')[0]
151

  
152
        url = join_urls(self.view_path, 'alice', container, obj)
153
        r = self.view(url)
154
        self.assertEqual(r.status_code, 403)
155

  
156
    def test_shared_with_me(self):
157
        container = self.create_container(user='alice')[0]
158
        obj, data = self.upload_object(container, user='alice')[:-1]
159

  
160
        # share object
161
        url = join_urls(self.pithos_path, 'alice', container, obj)
162
        self.post(url, user='alice', content_type='',
163
                  HTTP_CONTENT_RANGE='bytes */*',
164
                  HTTP_X_OBJECT_SHARING='read=user')
165

  
166
        url = join_urls(self.view_path, 'alice', container, obj)
167
        r = self.view(url)
168
        self.assertEqual(r.status_code, 200)
169
        self.assertEqual(r.content, data)
170

  
171
    def test_view(self):
172
        r = self.view(self.view_url)
173
        self.assertEqual(r.status_code, 200)
174
        self.assertEqual(r.content, self.odata)
175

  
176
    def test_not_existing(self):
177
        url = self.view_url[:-1]
178
        r = self.view(url)
179
        self.assertEqual(r.status_code, 404)
153 180

  
154 181
    def test_versions(self):
155 182
        c = self.cname
......
159 186
        r = self.post(self.api_url, content_type='', **meta)
160 187
        self.assertEqual(r.status_code, 202)
161 188

  
162
        r = self.get('%s?version=list&format=json' % self.view_url)
189
        r = self.view('%s?version=list&format=json' % self.view_url)
163 190
        self.assertEqual(r.status_code, 200)
164 191
        l1 = json.loads(r.content)['versions']
165 192
        self.assertEqual(len(l1), 2)
......
171 198
        self.assertEqual(r.status_code, 202)
172 199

  
173 200
        # assert a newly created version has been created
174
        r = self.get('%s?version=list&format=json' % self.view_url)
201
        r = self.view('%s?version=list&format=json' % self.view_url)
175 202
        self.assertEqual(r.status_code, 200)
176 203
        l2 = json.loads(r.content)['versions']
177 204
        self.assertEqual(len(l2), len(l1) + 1)
......
185 212
        self.append_object_data(c, o)
186 213

  
187 214
        # assert a newly created version has been created
188
        r = self.get('%s?version=list&format=json' % self.view_url)
215
        r = self.view('%s?version=list&format=json' % self.view_url)
189 216
        self.assertEqual(r.status_code, 200)
190 217
        l3 = json.loads(r.content)['versions']
191 218
        self.assertEqual(len(l3), len(l2) + 1)
......
194 221
    def test_objects_with_trailing_spaces(self):
195 222
        cname = self.cname
196 223

  
197
        r = self.get(quote('%s ' % self.view_url))
224
        r = self.view(quote('%s ' % self.view_url))
198 225
        self.assertEqual(r.status_code, 404)
199 226

  
200 227
        # delete object
201 228
        self.delete(self.api_url)
202 229

  
203
        r = self.get(self.view_url)
230
        r = self.view(self.view_url)
204 231
        self.assertEqual(r.status_code, 404)
205 232

  
206 233
        # upload object with trailing space
207 234
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
208 235

  
209 236
        view_url = join_urls(self.view_path, self.user, cname, oname)
210
        r = self.get(view_url)
237
        r = self.view(view_url)
211 238
        self.assertEqual(r.status_code, 200)
212 239

  
213 240
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
214
        r = self.get(view_url)
241
        r = self.view(view_url)
215 242
        self.assertEqual(r.status_code, 404)
216 243

  
217 244
    def test_get_partial(self):
218 245
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
219
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
246
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
220 247
        self.assertEqual(r.status_code, 206)
221 248
        self.assertEqual(r.content, self.odata[:limit + 1])
222 249
        self.assertTrue('Content-Range' in r)
......
227 254

  
228 255
    def test_get_range_not_satisfiable(self):
229 256
        # TODO
230
        #r = self.get(self.view_url, HTTP_RANGE='bytes=50-10')
257
        #r = self.view(self.view_url, HTTP_RANGE='bytes=50-10')
231 258
        #self.assertEqual(r.status_code, 416)
232 259

  
233 260
        offset = len(self.odata) + 1
234
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
261
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
235 262
        self.assertEqual(r.status_code, 416)
236 263

  
237 264
    def test_multiple_range(self):
238 265
        l = ['0-499', '-500', '1000-']
239 266
        ranges = 'bytes=%s' % ','.join(l)
240
        r = self.get(self.view_url, HTTP_RANGE=ranges)
267
        r = self.view(self.view_url, HTTP_RANGE=ranges)
241 268
        self.assertEqual(r.status_code, 206)
242 269
        self.assertTrue('content-type' in r)
243 270
        p = re.compile(
......
283 310
        out_of_range = len(self.odata) + 1
284 311
        l = ['0-499', '-500', '%d-' % out_of_range]
285 312
        ranges = 'bytes=%s' % ','.join(l)
286
        r = self.get(self.view_url, HTTP_RANGE=ranges)
313
        r = self.view(self.view_url, HTTP_RANGE=ranges)
287 314
        self.assertEqual(r.status_code, 416)
288 315

  
289 316
    def test_get_if_match(self):
......
292 319
        else:
293 320
            etag = merkle(self.odata)
294 321

  
295
        r = self.get(self.view_url, HTTP_IF_MATCH=etag)
322
        r = self.view(self.view_url, HTTP_IF_MATCH=etag)
296 323

  
297 324
        # assert get success
298 325
        self.assertEqual(r.status_code, 200)
......
301 328
        self.assertEqual(r.content, self.odata)
302 329

  
303 330
    def test_get_if_match_star(self):
304
        r = self.get(self.view_url, HTTP_IF_MATCH='*')
331
        r = self.view(self.view_url, HTTP_IF_MATCH='*')
305 332

  
306 333
        # assert get success
307 334
        self.assertEqual(r.status_code, 200)
......
316 343
            etag = merkle(self.odata)
317 344

  
318 345
        quoted = lambda s: '"%s"' % s
319
        r = self.get(self.view_url, HTTP_IF_MATCH=','.join(
346
        r = self.view(self.view_url, HTTP_IF_MATCH=','.join(
320 347
            [quoted(etag), quoted(get_random_data(64))]))
321 348

  
322 349
        # assert get success
......
326 353
        self.assertEqual(r.content, self.odata)
327 354

  
328 355
    def test_if_match_precondition_failed(self):
329
        r = self.get(self.view_url, HTTP_IF_MATCH=get_random_name())
356
        r = self.view(self.view_url, HTTP_IF_MATCH=get_random_name())
330 357
        self.assertEqual(r.status_code, 412)
331 358

  
332 359
    def test_if_none_match(self):
......
336 363
            etag = merkle(self.odata)
337 364

  
338 365
        # perform get with If-None-Match
339
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
366
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
340 367

  
341 368
        # assert precondition_failed
342 369
        self.assertEqual(r.status_code, 304)
......
346 373
        self.assertTrue(etag != r['ETag'])
347 374

  
348 375
        # perform get with If-None-Match
349
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
376
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
350 377

  
351 378
        # assert get success
352 379
        self.assertEqual(r.status_code, 200)
353 380

  
354 381
    def test_if_none_match_star(self):
355 382
        # perform get with If-None-Match with star
356
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH='*')
383
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH='*')
357 384
        self.assertEqual(r.status_code, 304)
358 385

  
359 386
    def test_if_modified_since(self):
......
365 392

  
366 393
        # Check not modified since
367 394
        for t in t1_formats:
368
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
395
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
369 396
            self.assertEqual(r.status_code, 304)
370 397

  
371 398
        _time.sleep(1)
......
375 402

  
376 403
        # Check modified since
377 404
        for t in t1_formats:
378
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
405
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
379 406
            self.assertEqual(r.status_code, 200)
380 407
            self.assertEqual(r.content, self.odata + appended_data)
381 408

  
382 409
    def test_if_modified_since_invalid_date(self):
383
        r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
410
        r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
384 411
        self.assertEqual(r.status_code, 200)
385 412
        self.assertEqual(r.content, self.odata)
386 413

  
......
393 420
        t1 = t + datetime.timedelta(seconds=1)
394 421
        t1_formats = map(t1.strftime, DATE_FORMATS)
395 422
        for t in t1_formats:
396
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
423
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
397 424
            self.assertEqual(r.status_code, 200)
398 425
            self.assertEqual(r.content, self.odata)
399 426

  
......
409 436

  
410 437
        # check modified
411 438
        for t in t2_formats:
412
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
439
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
413 440
            self.assertEqual(r.status_code, 412)
414 441

  
415 442
        # modify account: update object meta
......
424 451

  
425 452
        # check modified
426 453
        for t in t3_formats:
427
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
454
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
428 455
            self.assertEqual(r.status_code, 412)
429 456

  
430 457
    def test_if_unmodified_since(self):
......
435 462
        t_formats = map(t.strftime, DATE_FORMATS)
436 463

  
437 464
        for tf in t_formats:
438
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
465
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
439 466
            self.assertEqual(r.status_code, 200)
440 467
            self.assertEqual(r.content, self.odata)
441 468

  
......
447 474
        t_formats = map(t.strftime, DATE_FORMATS)
448 475

  
449 476
        for tf in t_formats:
450
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
477
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
451 478
            self.assertEqual(r.status_code, 412)
452 479

  
453 480
    def test_hashes(self):
......
456 483
        size = len(odata)
457 484

  
458 485
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
459
        r = self.get('%s?format=json&hashmap' % view_url)
486
        r = self.view('%s?format=json&hashmap' % view_url)
460 487
        self.assertEqual(r.status_code, 200)
461 488
        body = json.loads(r.content)
462 489

  

Also available in: Unified diff