Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / views.py @ 77b8a8e3

History | View | Annotate | Download (18.8 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api import settings as pithos_settings
38
from pithos.api.test import PithosAPITest, DATE_FORMATS
39
from pithos.api.test.util import (md5_hash, get_random_data, get_random_name)
40
from pithos.api.test.objects import merkle
41

    
42
from synnefo.lib.services import get_service_path
43
from synnefo.lib import join_urls
44

    
45
#from mock import patch
46
from urllib import quote
47

    
48
import django.utils.simplejson as json
49

    
50
import re
51
import datetime
52
import time as _time
53
import random
54
import urllib
55
import urlparse
56

    
57

    
58
def add_url_params(url, **kwargs):
59
    if not kwargs:
60
        return url
61
    parts = list(urlparse.urlsplit(url))
62
    params = dict(urlparse.parse_qsl(parts[3], keep_blank_values=True))
63
    params.update(kwargs)
64
    parts[3] = urllib.urlencode(params)
65
    return urlparse.urlunsplit(parts)
66

    
67

    
68
class NotAllowedView(PithosAPITest):
69
    def test_not_allowed(self):
70
        self.view_path = join_urls(get_service_path(
71
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
72
        self.view_url = join_urls(self.view_path, self.user, get_random_name(),
73
                                  get_random_name())
74

    
75
        r = self.delete(self.view_url)
76
        self.assertEqual(r.status_code, 405)
77
        self.assertTrue('Allow' in r)
78
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
79

    
80
        r = self.post(self.view_url)
81
        self.assertEqual(r.status_code, 405)
82
        self.assertTrue('Allow' in r)
83
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
84

    
85
        r = self.put(self.view_url)
86
        self.assertEqual(r.status_code, 405)
87
        self.assertTrue('Allow' in r)
88
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
89

    
90
        r = self.copy(self.view_url)
91
        self.assertEqual(r.status_code, 405)
92
        self.assertTrue('Allow' in r)
93
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
94

    
95
        r = self.move(self.view_url)
96
        self.assertEqual(r.status_code, 405)
97
        self.assertTrue('Allow' in r)
98
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
99

    
100

    
101
class ObjectGetView(PithosAPITest):
102
    def setUp(self):
103
        PithosAPITest.setUp(self)
104
        self.cname = self.create_container()[0]
105
        self.oname, self.odata = self.upload_object(self.cname,
106
                                                    'φωτογραφία.JPG')[:-1]
107

    
108
        self.view_path = join_urls(get_service_path(
109
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
110
        self.view_url = join_urls(self.view_path, self.user, self.cname,
111
                                  self.oname)
112
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
113
                                 self.oname)
114

    
115
    def view(self, url, access_token='valid_token', user='user', *args,
116
             **kwargs):
117

    
118
        params = {}
119
        if access_token is not None:
120
            params['access_token'] = access_token
121
        return self.get(add_url_params(url, **params), user, *args,
122
                        **kwargs)
123

    
124
    def test_no_authorization_granted(self):
125
        r = self.get(self.view_url)
126
        self.assertEqual(r.status_code, 302)
127
        self.assertTrue('Location' in r)
128
        p = urlparse.urlparse(r['Location'])
129
        self.assertEqual(p.netloc, 'testserver')
130
        self.assertEqual(p.path, '/astakos/oauth2/auth')
131

    
132
        r = self.get(add_url_params(self.view_url, code='valid_code'),
133
                     follow=True)
134
        self.assertEqual(r.status_code, 200)
135
        self.assertTrue(r.content, self.odata)
136
        intermidiate_url = r.redirect_chain[0][0]
137
        p = urlparse.urlparse(intermidiate_url)
138
        params = urlparse.parse_qs(p.query)
139
        self.assertTrue('access_token' in params)
140

    
141
        r = self.get(add_url_params(self.view_url, access_token='valid_token'))
142
        self.assertEqual(r.status_code, 200)
143
        self.assertTrue(r.content, self.odata)
144

    
145
        r = self.get('%s&disposition-type=inline' %
146
                     add_url_params(self.view_url, access_token='valid_token'))
147
        self.assertEqual(r.status_code, 200)
148
        self.assertTrue(r.content, self.odata)
149
        self.assertTrue('Content-Disposition' in r)
150
        self.assertTrue('inline' in r['Content-Disposition'])
151

    
152
        r = self.get('%s&disposition-type=attachment' %
153
                     add_url_params(self.view_url, access_token='valid_token'))
154
        self.assertEqual(r.status_code, 200)
155
        self.assertTrue(r.content, self.odata)
156
        self.assertTrue('Content-Disposition' in r)
157
        self.assertTrue('attachment' in r['Content-Disposition'])
158

    
159
    def test_forbidden(self):
160
        container = self.create_container(user='alice')[0]
161
        obj = self.upload_object(container, user='alice')[0]
162

    
163
        url = join_urls(self.view_path, 'alice', container, obj)
164
        r = self.view(url)
165
        self.assertEqual(r.status_code, 403)
166

    
167
    def test_shared_with_me(self):
168
        container = self.create_container(user='alice')[0]
169
        obj, data = self.upload_object(container, user='alice')[:-1]
170

    
171
        # share object
172
        url = join_urls(self.pithos_path, 'alice', container, obj)
173
        self.post(url, user='alice', content_type='',
174
                  HTTP_CONTENT_RANGE='bytes */*',
175
                  HTTP_X_OBJECT_SHARING='read=user')
176

    
177
        url = join_urls(self.view_path, 'alice', container, obj)
178
        r = self.view(url)
179
        self.assertEqual(r.status_code, 200)
180
        self.assertEqual(r.content, data)
181

    
182
    def test_view(self):
183
        r = self.view(self.view_url)
184
        self.assertEqual(r.status_code, 200)
185
        self.assertEqual(r.content, self.odata)
186

    
187
    def test_not_existing(self):
188
        url = self.view_url[:-1]
189
        r = self.view(url)
190
        self.assertEqual(r.status_code, 404)
191

    
192
    def test_versions(self):
193
        c = self.cname
194
        o = self.oname
195

    
196
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
197
        r = self.post(self.api_url, content_type='', **meta)
198
        self.assertEqual(r.status_code, 202)
199

    
200
        r = self.view('%s?version=list&format=json' % self.view_url)
201
        self.assertEqual(r.status_code, 200)
202
        l1 = json.loads(r.content)['versions']
203
        self.assertEqual(len(l1), 2)
204

    
205
        # update meta
206
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
207
                'HTTP_X_OBJECT_META_STOCK': 'True'}
208
        r = self.post(self.api_url, content_type='', **meta)
209
        self.assertEqual(r.status_code, 202)
210

    
211
        # assert a newly created version has been created
212
        r = self.view('%s?version=list&format=json' % self.view_url)
213
        self.assertEqual(r.status_code, 200)
214
        l2 = json.loads(r.content)['versions']
215
        self.assertEqual(len(l2), len(l1) + 1)
216
        self.assertEqual(l2[:-1], l1)
217

    
218
        vserial, _ = l2[-2]
219
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
220
                         {'Quality': 'AAA'})
221

    
222
        # update data
223
        self.append_object_data(c, o)
224

    
225
        # assert a newly created version has been created
226
        r = self.view('%s?version=list&format=json' % self.view_url)
227
        self.assertEqual(r.status_code, 200)
228
        l3 = json.loads(r.content)['versions']
229
        self.assertEqual(len(l3), len(l2) + 1)
230
        self.assertEqual(l3[:-1], l2)
231

    
232
    def test_objects_with_trailing_spaces(self):
233
        cname = self.cname
234

    
235
        r = self.view(quote('%s ' % self.view_url))
236
        self.assertEqual(r.status_code, 404)
237

    
238
        # delete object
239
        self.delete(self.api_url)
240

    
241
        r = self.view(self.view_url)
242
        self.assertEqual(r.status_code, 404)
243

    
244
        # upload object with trailing space
245
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
246

    
247
        view_url = join_urls(self.view_path, self.user, cname, oname)
248
        r = self.view(view_url)
249
        self.assertEqual(r.status_code, 200)
250

    
251
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
252
        r = self.view(view_url)
253
        self.assertEqual(r.status_code, 404)
254

    
255
    def test_get_partial(self):
256
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
257
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
258
        self.assertEqual(r.status_code, 206)
259
        self.assertEqual(r.content, self.odata[:limit + 1])
260
        self.assertTrue('Content-Range' in r)
261
        self.assertEqual(r['Content-Range'], 'bytes 0-%d/%d' % (
262
            limit, len(self.odata)))
263
        self.assertTrue('Content-Type' in r)
264
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
265

    
266
    def test_get_range_not_satisfiable(self):
267
        # TODO
268
        #r = self.view(self.view_url, HTTP_RANGE='bytes=50-10')
269
        #self.assertEqual(r.status_code, 416)
270

    
271
        offset = len(self.odata) + 1
272
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
273
        self.assertEqual(r.status_code, 416)
274

    
275
    def test_multiple_range(self):
276
        l = ['0-499', '-500', '1000-']
277
        ranges = 'bytes=%s' % ','.join(l)
278
        r = self.view(self.view_url, HTTP_RANGE=ranges)
279
        self.assertEqual(r.status_code, 206)
280
        self.assertTrue('content-type' in r)
281
        p = re.compile(
282
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
283
            re.I)
284
        m = p.match(r['content-type'])
285
        if m is None:
286
            self.fail('Invalid multiple range content type')
287
        boundary = m.groupdict()['boundary']
288
        cparts = r.content.split('--%s' % boundary)[1:-1]
289

    
290
        # assert content parts length
291
        self.assertEqual(len(cparts), len(l))
292

    
293
        # for each content part assert headers
294
        i = 0
295
        for cpart in cparts:
296
            content = cpart.split('\r\n')
297
            headers = content[1:3]
298
            content_range = headers[0].split(': ')
299
            self.assertEqual(content_range[0], 'Content-Range')
300

    
301
            r = l[i].split('-')
302
            if not r[0] and not r[1]:
303
                pass
304
            elif not r[0]:
305
                start = len(self.odata) - int(r[1])
306
                end = len(self.odata)
307
            elif not r[1]:
308
                start = int(r[0])
309
                end = len(self.odata)
310
            else:
311
                start = int(r[0])
312
                end = int(r[1]) + 1
313
            fdata = self.odata[start:end]
314
            sdata = '\r\n'.join(content[4:-1])
315
            self.assertEqual(len(fdata), len(sdata))
316
            self.assertEquals(fdata, sdata)
317
            i += 1
318

    
319
    def test_multiple_range_not_satisfiable(self):
320
        # perform get with multiple range
321
        out_of_range = len(self.odata) + 1
322
        l = ['0-499', '-500', '%d-' % out_of_range]
323
        ranges = 'bytes=%s' % ','.join(l)
324
        r = self.view(self.view_url, HTTP_RANGE=ranges)
325
        self.assertEqual(r.status_code, 416)
326

    
327
    def test_get_if_match(self):
328
        if pithos_settings.UPDATE_MD5:
329
            etag = md5_hash(self.odata)
330
        else:
331
            etag = merkle(self.odata)
332

    
333
        r = self.view(self.view_url, HTTP_IF_MATCH=etag)
334

    
335
        # assert get success
336
        self.assertEqual(r.status_code, 200)
337

    
338
        # assert response content
339
        self.assertEqual(r.content, self.odata)
340

    
341
    def test_get_if_match_star(self):
342
        r = self.view(self.view_url, HTTP_IF_MATCH='*')
343

    
344
        # assert get success
345
        self.assertEqual(r.status_code, 200)
346

    
347
        # assert response content
348
        self.assertEqual(r.content, self.odata)
349

    
350
    def test_get_multiple_if_match(self):
351
        if pithos_settings.UPDATE_MD5:
352
            etag = md5_hash(self.odata)
353
        else:
354
            etag = merkle(self.odata)
355

    
356
        quoted = lambda s: '"%s"' % s
357
        r = self.view(self.view_url, HTTP_IF_MATCH=','.join(
358
            [quoted(etag), quoted(get_random_data(64))]))
359

    
360
        # assert get success
361
        self.assertEqual(r.status_code, 200)
362

    
363
        # assert response content
364
        self.assertEqual(r.content, self.odata)
365

    
366
    def test_if_match_precondition_failed(self):
367
        r = self.view(self.view_url, HTTP_IF_MATCH=get_random_name())
368
        self.assertEqual(r.status_code, 412)
369

    
370
    def test_if_none_match(self):
371
        if pithos_settings.UPDATE_MD5:
372
            etag = md5_hash(self.odata)
373
        else:
374
            etag = merkle(self.odata)
375

    
376
        # perform get with If-None-Match
377
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
378

    
379
        # assert precondition_failed
380
        self.assertEqual(r.status_code, 304)
381

    
382
        # update object data
383
        r = self.append_object_data(self.cname, self.oname)[-1]
384
        self.assertTrue(etag != r['ETag'])
385

    
386
        # perform get with If-None-Match
387
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
388

    
389
        # assert get success
390
        self.assertEqual(r.status_code, 200)
391

    
392
    def test_if_none_match_star(self):
393
        # perform get with If-None-Match with star
394
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH='*')
395
        self.assertEqual(r.status_code, 304)
396

    
397
    def test_if_modified_since(self):
398
        # upload object
399
        object_info = self.get_object_info(self.cname, self.oname)
400
        last_modified = object_info['Last-Modified']
401
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
402
        t1_formats = map(t1.strftime, DATE_FORMATS)
403

    
404
        # Check not modified since
405
        for t in t1_formats:
406
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
407
            self.assertEqual(r.status_code, 304)
408

    
409
        _time.sleep(1)
410

    
411
        # update object data
412
        appended_data = self.append_object_data(self.cname, self.oname)[1]
413

    
414
        # Check modified since
415
        for t in t1_formats:
416
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
417
            self.assertEqual(r.status_code, 200)
418
            self.assertEqual(r.content, self.odata + appended_data)
419

    
420
    def test_if_modified_since_invalid_date(self):
421
        r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
422
        self.assertEqual(r.status_code, 200)
423
        self.assertEqual(r.content, self.odata)
424

    
425
    def test_if_not_modified_since(self):
426
        object_info = self.get_object_info(self.cname, self.oname)
427
        last_modified = object_info['Last-Modified']
428
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
429

    
430
        # Check unmodified
431
        t1 = t + datetime.timedelta(seconds=1)
432
        t1_formats = map(t1.strftime, DATE_FORMATS)
433
        for t in t1_formats:
434
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
435
            self.assertEqual(r.status_code, 200)
436
            self.assertEqual(r.content, self.odata)
437

    
438
        # modify object
439
        _time.sleep(2)
440
        self.append_object_data(self.cname, self.oname)
441

    
442
        object_info = self.get_object_info(self.cname, self.oname)
443
        last_modified = object_info['Last-Modified']
444
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
445
        t2 = t - datetime.timedelta(seconds=1)
446
        t2_formats = map(t2.strftime, DATE_FORMATS)
447

    
448
        # check modified
449
        for t in t2_formats:
450
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
451
            self.assertEqual(r.status_code, 412)
452

    
453
        # modify account: update object meta
454
        _time.sleep(1)
455
        self.update_object_meta(self.cname, self.oname, {'foo': 'bar'})
456

    
457
        object_info = self.get_object_info(self.cname, self.oname)
458
        last_modified = object_info['Last-Modified']
459
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
460
        t3 = t - datetime.timedelta(seconds=1)
461
        t3_formats = map(t3.strftime, DATE_FORMATS)
462

    
463
        # check modified
464
        for t in t3_formats:
465
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
466
            self.assertEqual(r.status_code, 412)
467

    
468
    def test_if_unmodified_since(self):
469
        object_info = self.get_object_info(self.cname, self.oname)
470
        last_modified = object_info['Last-Modified']
471
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
472
        t = t + datetime.timedelta(seconds=1)
473
        t_formats = map(t.strftime, DATE_FORMATS)
474

    
475
        for tf in t_formats:
476
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
477
            self.assertEqual(r.status_code, 200)
478
            self.assertEqual(r.content, self.odata)
479

    
480
    def test_if_unmodified_since_precondition_failed(self):
481
        object_info = self.get_object_info(self.cname, self.oname)
482
        last_modified = object_info['Last-Modified']
483
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
484
        t = t - datetime.timedelta(seconds=1)
485
        t_formats = map(t.strftime, DATE_FORMATS)
486

    
487
        for tf in t_formats:
488
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
489
            self.assertEqual(r.status_code, 412)
490

    
491
    def test_hashes(self):
492
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
493
        oname, odata = self.upload_object(self.cname, length=l)[:-1]
494
        size = len(odata)
495

    
496
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
497
        r = self.view('%s?format=json&hashmap' % view_url)
498
        self.assertEqual(r.status_code, 200)
499
        body = json.loads(r.content)
500

    
501
        hashes = body['hashes']
502
        block_size = body['block_size']
503
        block_num = size / block_size if size / block_size == 0 else\
504
            size / block_size + 1
505
        self.assertTrue(len(hashes), block_num)
506
        i = 0
507
        for h in hashes:
508
            start = i * block_size
509
            end = (i + 1) * block_size
510
            hash = merkle(odata[start:end])
511
            self.assertEqual(h, hash)
512
            i += 1