Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / views.py @ f4f948c0

History | View | Annotate | Download (17.9 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api import settings as pithos_settings
38
from pithos.api.test import PithosAPITest, DATE_FORMATS
39
from pithos.api.test.util import (md5_hash, get_random_data, get_random_name)
40
from pithos.api.test.objects import merkle
41

    
42
from synnefo.lib.services import get_service_path
43
from synnefo.lib import join_urls
44

    
45
#from mock import patch
46
from urllib import quote
47

    
48
import django.utils.simplejson as json
49

    
50
import re
51
import datetime
52
import time as _time
53
import random
54
import urllib
55
import urlparse
56

    
57

    
58
def add_url_params(url, **kwargs):
59
    if not kwargs:
60
        return url
61
    parts = list(urlparse.urlsplit(url))
62
    params = dict(urlparse.parse_qsl(parts[3], keep_blank_values=True))
63
    params.update(kwargs)
64
    parts[3] = urllib.urlencode(params)
65
    return urlparse.urlunsplit(parts)
66

    
67

    
68
class NotAllowedView(PithosAPITest):
69
    def test_not_allowed(self):
70
        self.view_path = join_urls(get_service_path(
71
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
72
        self.view_url = join_urls(self.view_path, self.user, get_random_name(),
73
                                  get_random_name())
74

    
75
        r = self.delete(self.view_url)
76
        self.assertEqual(r.status_code, 405)
77
        self.assertTrue('Allow' in r)
78
        self.assertEqual(r['Allow'],  'GET')
79

    
80
        r = self.post(self.view_url)
81
        self.assertEqual(r.status_code, 405)
82
        self.assertTrue('Allow' in r)
83
        self.assertEqual(r['Allow'],  'GET')
84

    
85
        r = self.put(self.view_url)
86
        self.assertEqual(r.status_code, 405)
87
        self.assertTrue('Allow' in r)
88
        self.assertEqual(r['Allow'],  'GET')
89

    
90
        r = self.copy(self.view_url)
91
        self.assertEqual(r.status_code, 405)
92
        self.assertTrue('Allow' in r)
93
        self.assertEqual(r['Allow'],  'GET')
94

    
95
        r = self.move(self.view_url)
96
        self.assertEqual(r.status_code, 405)
97
        self.assertTrue('Allow' in r)
98
        self.assertEqual(r['Allow'],  'GET')
99

    
100

    
101
class ObjectGetView(PithosAPITest):
102
    def setUp(self):
103
        PithosAPITest.setUp(self)
104
        self.cname = self.create_container()[0]
105
        self.oname, self.odata = self.upload_object(self.cname)[:-1]
106

    
107
        self.view_path = join_urls(get_service_path(
108
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
109
        self.view_url = join_urls(self.view_path, self.user, self.cname,
110
                                  self.oname)
111
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
112
                                 self.oname)
113

    
114
    def view(self, url, access_token='valid_token', user='user', *args,
115
             **kwargs):
116

    
117
        params = {}
118
        if access_token is not None:
119
            params['access_token'] = access_token
120
        return self.get(add_url_params(url, **params), user, *args,
121
                        **kwargs)
122

    
123
    def test_no_authorization_granted(self):
124
        r = self.get(self.view_url)
125
        self.assertEqual(r.status_code, 302)
126
        self.assertTrue('Location' in r)
127
        p = urlparse.urlparse(r['Location'])
128
        self.assertEqual(p.netloc, 'testserver')
129
        self.assertEqual(p.path, '/astakos/oauth2/auth')
130

    
131
        r = self.get(add_url_params(self.view_url, code='valid_code'),
132
                     follow=True)
133
        self.assertEqual(r.status_code, 200)
134
        self.assertTrue(r.content, self.odata)
135
        intermidiate_url = r.redirect_chain[0][0]
136
        p = urlparse.urlparse(intermidiate_url)
137
        params = urlparse.parse_qs(p.query)
138
        self.assertTrue('access_token' in params)
139

    
140
        r = self.get(add_url_params(self.view_url, access_token='valid_token'))
141
        self.assertEqual(r.status_code, 200)
142
        self.assertTrue(r.content, self.odata)
143

    
144
    def test_forbidden(self):
145
        container = self.create_container(user='alice')[0]
146
        obj = self.upload_object(container, user='alice')[0]
147

    
148
        url = join_urls(self.view_path, 'alice', container, obj)
149
        r = self.view(url)
150
        self.assertEqual(r.status_code, 403)
151

    
152
    def test_shared_with_me(self):
153
        container = self.create_container(user='alice')[0]
154
        obj, data = self.upload_object(container, user='alice')[:-1]
155

    
156
        # share object
157
        url = join_urls(self.pithos_path, 'alice', container, obj)
158
        self.post(url, user='alice', content_type='',
159
                  HTTP_CONTENT_RANGE='bytes */*',
160
                  HTTP_X_OBJECT_SHARING='read=user')
161

    
162
        url = join_urls(self.view_path, 'alice', container, obj)
163
        r = self.view(url)
164
        self.assertEqual(r.status_code, 200)
165
        self.assertEqual(r.content, data)
166

    
167
    def test_view(self):
168
        r = self.view(self.view_url)
169
        self.assertEqual(r.status_code, 200)
170
        self.assertEqual(r.content, self.odata)
171

    
172
    def test_not_existing(self):
173
        url = self.view_url[:-1]
174
        r = self.view(url)
175
        self.assertEqual(r.status_code, 404)
176

    
177
    def test_versions(self):
178
        c = self.cname
179
        o = self.oname
180

    
181
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
182
        r = self.post(self.api_url, content_type='', **meta)
183
        self.assertEqual(r.status_code, 202)
184

    
185
        r = self.view('%s?version=list&format=json' % self.view_url)
186
        self.assertEqual(r.status_code, 200)
187
        l1 = json.loads(r.content)['versions']
188
        self.assertEqual(len(l1), 2)
189

    
190
        # update meta
191
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
192
                'HTTP_X_OBJECT_META_STOCK': 'True'}
193
        r = self.post(self.api_url, content_type='', **meta)
194
        self.assertEqual(r.status_code, 202)
195

    
196
        # assert a newly created version has been created
197
        r = self.view('%s?version=list&format=json' % self.view_url)
198
        self.assertEqual(r.status_code, 200)
199
        l2 = json.loads(r.content)['versions']
200
        self.assertEqual(len(l2), len(l1) + 1)
201
        self.assertEqual(l2[:-1], l1)
202

    
203
        vserial, _ = l2[-2]
204
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
205
                         {'Quality': 'AAA'})
206

    
207
        # update data
208
        self.append_object_data(c, o)
209

    
210
        # assert a newly created version has been created
211
        r = self.view('%s?version=list&format=json' % self.view_url)
212
        self.assertEqual(r.status_code, 200)
213
        l3 = json.loads(r.content)['versions']
214
        self.assertEqual(len(l3), len(l2) + 1)
215
        self.assertEqual(l3[:-1], l2)
216

    
217
    def test_objects_with_trailing_spaces(self):
218
        cname = self.cname
219

    
220
        r = self.view(quote('%s ' % self.view_url))
221
        self.assertEqual(r.status_code, 404)
222

    
223
        # delete object
224
        self.delete(self.api_url)
225

    
226
        r = self.view(self.view_url)
227
        self.assertEqual(r.status_code, 404)
228

    
229
        # upload object with trailing space
230
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
231

    
232
        view_url = join_urls(self.view_path, self.user, cname, oname)
233
        r = self.view(view_url)
234
        self.assertEqual(r.status_code, 200)
235

    
236
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
237
        r = self.view(view_url)
238
        self.assertEqual(r.status_code, 404)
239

    
240
    def test_get_partial(self):
241
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
242
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
243
        self.assertEqual(r.status_code, 206)
244
        self.assertEqual(r.content, self.odata[:limit + 1])
245
        self.assertTrue('Content-Range' in r)
246
        self.assertEqual(r['Content-Range'], 'bytes 0-%d/%d' % (
247
            limit, len(self.odata)))
248
        self.assertTrue('Content-Type' in r)
249
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
250

    
251
    def test_get_range_not_satisfiable(self):
252
        # TODO
253
        #r = self.view(self.view_url, HTTP_RANGE='bytes=50-10')
254
        #self.assertEqual(r.status_code, 416)
255

    
256
        offset = len(self.odata) + 1
257
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
258
        self.assertEqual(r.status_code, 416)
259

    
260
    def test_multiple_range(self):
261
        l = ['0-499', '-500', '1000-']
262
        ranges = 'bytes=%s' % ','.join(l)
263
        r = self.view(self.view_url, HTTP_RANGE=ranges)
264
        self.assertEqual(r.status_code, 206)
265
        self.assertTrue('content-type' in r)
266
        p = re.compile(
267
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
268
            re.I)
269
        m = p.match(r['content-type'])
270
        if m is None:
271
            self.fail('Invalid multiple range content type')
272
        boundary = m.groupdict()['boundary']
273
        cparts = r.content.split('--%s' % boundary)[1:-1]
274

    
275
        # assert content parts length
276
        self.assertEqual(len(cparts), len(l))
277

    
278
        # for each content part assert headers
279
        i = 0
280
        for cpart in cparts:
281
            content = cpart.split('\r\n')
282
            headers = content[1:3]
283
            content_range = headers[0].split(': ')
284
            self.assertEqual(content_range[0], 'Content-Range')
285

    
286
            r = l[i].split('-')
287
            if not r[0] and not r[1]:
288
                pass
289
            elif not r[0]:
290
                start = len(self.odata) - int(r[1])
291
                end = len(self.odata)
292
            elif not r[1]:
293
                start = int(r[0])
294
                end = len(self.odata)
295
            else:
296
                start = int(r[0])
297
                end = int(r[1]) + 1
298
            fdata = self.odata[start:end]
299
            sdata = '\r\n'.join(content[4:-1])
300
            self.assertEqual(len(fdata), len(sdata))
301
            self.assertEquals(fdata, sdata)
302
            i += 1
303

    
304
    def test_multiple_range_not_satisfiable(self):
305
        # perform get with multiple range
306
        out_of_range = len(self.odata) + 1
307
        l = ['0-499', '-500', '%d-' % out_of_range]
308
        ranges = 'bytes=%s' % ','.join(l)
309
        r = self.view(self.view_url, HTTP_RANGE=ranges)
310
        self.assertEqual(r.status_code, 416)
311

    
312
    def test_get_if_match(self):
313
        if pithos_settings.UPDATE_MD5:
314
            etag = md5_hash(self.odata)
315
        else:
316
            etag = merkle(self.odata)
317

    
318
        r = self.view(self.view_url, HTTP_IF_MATCH=etag)
319

    
320
        # assert get success
321
        self.assertEqual(r.status_code, 200)
322

    
323
        # assert response content
324
        self.assertEqual(r.content, self.odata)
325

    
326
    def test_get_if_match_star(self):
327
        r = self.view(self.view_url, HTTP_IF_MATCH='*')
328

    
329
        # assert get success
330
        self.assertEqual(r.status_code, 200)
331

    
332
        # assert response content
333
        self.assertEqual(r.content, self.odata)
334

    
335
    def test_get_multiple_if_match(self):
336
        if pithos_settings.UPDATE_MD5:
337
            etag = md5_hash(self.odata)
338
        else:
339
            etag = merkle(self.odata)
340

    
341
        quoted = lambda s: '"%s"' % s
342
        r = self.view(self.view_url, HTTP_IF_MATCH=','.join(
343
            [quoted(etag), quoted(get_random_data(64))]))
344

    
345
        # assert get success
346
        self.assertEqual(r.status_code, 200)
347

    
348
        # assert response content
349
        self.assertEqual(r.content, self.odata)
350

    
351
    def test_if_match_precondition_failed(self):
352
        r = self.view(self.view_url, HTTP_IF_MATCH=get_random_name())
353
        self.assertEqual(r.status_code, 412)
354

    
355
    def test_if_none_match(self):
356
        if pithos_settings.UPDATE_MD5:
357
            etag = md5_hash(self.odata)
358
        else:
359
            etag = merkle(self.odata)
360

    
361
        # perform get with If-None-Match
362
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
363

    
364
        # assert precondition_failed
365
        self.assertEqual(r.status_code, 304)
366

    
367
        # update object data
368
        r = self.append_object_data(self.cname, self.oname)[-1]
369
        self.assertTrue(etag != r['ETag'])
370

    
371
        # perform get with If-None-Match
372
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
373

    
374
        # assert get success
375
        self.assertEqual(r.status_code, 200)
376

    
377
    def test_if_none_match_star(self):
378
        # perform get with If-None-Match with star
379
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH='*')
380
        self.assertEqual(r.status_code, 304)
381

    
382
    def test_if_modified_since(self):
383
        # upload object
384
        object_info = self.get_object_info(self.cname, self.oname)
385
        last_modified = object_info['Last-Modified']
386
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
387
        t1_formats = map(t1.strftime, DATE_FORMATS)
388

    
389
        # Check not modified since
390
        for t in t1_formats:
391
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
392
            self.assertEqual(r.status_code, 304)
393

    
394
        _time.sleep(1)
395

    
396
        # update object data
397
        appended_data = self.append_object_data(self.cname, self.oname)[1]
398

    
399
        # Check modified since
400
        for t in t1_formats:
401
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
402
            self.assertEqual(r.status_code, 200)
403
            self.assertEqual(r.content, self.odata + appended_data)
404

    
405
    def test_if_modified_since_invalid_date(self):
406
        r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
407
        self.assertEqual(r.status_code, 200)
408
        self.assertEqual(r.content, self.odata)
409

    
410
    def test_if_not_modified_since(self):
411
        object_info = self.get_object_info(self.cname, self.oname)
412
        last_modified = object_info['Last-Modified']
413
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
414

    
415
        # Check unmodified
416
        t1 = t + datetime.timedelta(seconds=1)
417
        t1_formats = map(t1.strftime, DATE_FORMATS)
418
        for t in t1_formats:
419
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
420
            self.assertEqual(r.status_code, 200)
421
            self.assertEqual(r.content, self.odata)
422

    
423
        # modify object
424
        _time.sleep(2)
425
        self.append_object_data(self.cname, self.oname)
426

    
427
        object_info = self.get_object_info(self.cname, self.oname)
428
        last_modified = object_info['Last-Modified']
429
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
430
        t2 = t - datetime.timedelta(seconds=1)
431
        t2_formats = map(t2.strftime, DATE_FORMATS)
432

    
433
        # check modified
434
        for t in t2_formats:
435
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
436
            self.assertEqual(r.status_code, 412)
437

    
438
        # modify account: update object meta
439
        _time.sleep(1)
440
        self.update_object_meta(self.cname, self.oname, {'foo': 'bar'})
441

    
442
        object_info = self.get_object_info(self.cname, self.oname)
443
        last_modified = object_info['Last-Modified']
444
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
445
        t3 = t - datetime.timedelta(seconds=1)
446
        t3_formats = map(t3.strftime, DATE_FORMATS)
447

    
448
        # check modified
449
        for t in t3_formats:
450
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
451
            self.assertEqual(r.status_code, 412)
452

    
453
    def test_if_unmodified_since(self):
454
        object_info = self.get_object_info(self.cname, self.oname)
455
        last_modified = object_info['Last-Modified']
456
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
457
        t = t + datetime.timedelta(seconds=1)
458
        t_formats = map(t.strftime, DATE_FORMATS)
459

    
460
        for tf in t_formats:
461
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
462
            self.assertEqual(r.status_code, 200)
463
            self.assertEqual(r.content, self.odata)
464

    
465
    def test_if_unmodified_since_precondition_failed(self):
466
        object_info = self.get_object_info(self.cname, self.oname)
467
        last_modified = object_info['Last-Modified']
468
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
469
        t = t - datetime.timedelta(seconds=1)
470
        t_formats = map(t.strftime, DATE_FORMATS)
471

    
472
        for tf in t_formats:
473
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
474
            self.assertEqual(r.status_code, 412)
475

    
476
    def test_hashes(self):
477
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
478
        oname, odata = self.upload_object(self.cname, length=l)[:-1]
479
        size = len(odata)
480

    
481
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
482
        r = self.view('%s?format=json&hashmap' % view_url)
483
        self.assertEqual(r.status_code, 200)
484
        body = json.loads(r.content)
485

    
486
        hashes = body['hashes']
487
        block_size = body['block_size']
488
        block_num = size / block_size if size / block_size == 0 else\
489
            size / block_size + 1
490
        self.assertTrue(len(hashes), block_num)
491
        i = 0
492
        for h in hashes:
493
            start = i * block_size
494
            end = (i + 1) * block_size
495
            hash = merkle(odata[start:end])
496
            self.assertEqual(h, hash)
497
            i += 1