Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / views.py @ 3dce76b5

History | View | Annotate | Download (16.9 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api import settings as pithos_settings
38
from pithos.api.test import PithosAPITest, DATE_FORMATS
39
from pithos.api.test.util import (md5_hash, get_random_data, get_random_name)
40
from pithos.api.test.objects import merkle
41

    
42
from synnefo.lib.services import get_service_path
43
from synnefo.lib import join_urls
44

    
45
from mock import patch
46
from urllib import quote
47
from urlparse import urlsplit, parse_qs
48

    
49
import django.utils.simplejson as json
50

    
51
import re
52
import datetime
53
import time as _time
54
import random
55

    
56

    
57
class NotAllowedView(PithosAPITest):
58
    def head(self, url, *args, **kwargs):
59
        with patch("pithos.api.util.get_token_from_cookie") as m:
60
            m.return_value = 'token'
61
            return super(NotAllowedView, self).head(url, *args, **kwargs)
62

    
63
    def delete(self, url, *args, **kwargs):
64
        with patch("pithos.api.util.get_token_from_cookie") as m:
65
            m.return_value = 'token'
66
            return super(NotAllowedView, self).delete(url, *args, **kwargs)
67

    
68
    def post(self, url, *args, **kwargs):
69
        with patch("pithos.api.util.get_token_from_cookie") as m:
70
            m.return_value = 'token'
71
            return super(NotAllowedView, self).post(url, *args, **kwargs)
72

    
73
    def put(self, url, *args, **kwargs):
74
        with patch("pithos.api.util.get_token_from_cookie") as m:
75
            m.return_value = 'token'
76
            return super(NotAllowedView, self).put(url, *args, **kwargs)
77

    
78
    def copy(self, url, *args, **kwargs):
79
        with patch("pithos.api.util.get_token_from_cookie") as m:
80
            m.return_value = 'token'
81
            return super(NotAllowedView, self).copy(url, *args, **kwargs)
82

    
83
    def move(self, url, *args, **kwargs):
84
        with patch("pithos.api.util.get_token_from_cookie") as m:
85
            m.return_value = 'token'
86
            return super(NotAllowedView, self).move(url, *args, **kwargs)
87

    
88
    def test_not_allowed(self):
89
        self.view_path = join_urls(get_service_path(
90
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
91
        self.view_url = join_urls(self.view_path, self.user, get_random_name(),
92
                                  get_random_name())
93

    
94
        r = self.head(self.view_url)
95
        self.assertEqual(r.status_code, 400)
96

    
97
        r = self.delete(self.view_url)
98
        self.assertEqual(r.status_code, 400)
99

    
100
        r = self.post(self.view_url)
101
        self.assertEqual(r.status_code, 400)
102

    
103
        r = self.put(self.view_url)
104
        self.assertEqual(r.status_code, 400)
105

    
106
        r = self.copy(self.view_url)
107
        self.assertEqual(r.status_code, 400)
108

    
109
        r = self.move(self.view_url)
110
        self.assertEqual(r.status_code, 400)
111

    
112

    
113
class ObjectGetView(PithosAPITest):
114
    def setUp(self):
115
        PithosAPITest.setUp(self)
116
        self.cname = self.create_container()[0]
117
        self.oname, self.odata = self.upload_object(self.cname)[:-1]
118

    
119
        self.view_path = join_urls(get_service_path(
120
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
121
        self.view_url = join_urls(self.view_path, self.user, self.cname,
122
                                  self.oname)
123
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
124
                                 self.oname)
125

    
126
    def get(self, url, user='user', *args, **kwargs):
127
        with patch("pithos.api.util.get_token_from_cookie") as m:
128
            m.return_value = 'token'
129
            return super(ObjectGetView, self).get(url, user='user', *args,
130
                                                  **kwargs)
131

    
132
    def test_no_cookie_redirect(self):
133
        r = super(ObjectGetView, self).get(self.view_url)
134
        self.assertEqual(r.status_code, 302)
135
        self.assertTrue('Location' in r)
136
        parts = list(urlsplit(r['Location']))
137
        qs = parse_qs(parts[3])
138
        self.assertTrue('next' in qs)
139
        self.assertEqual(qs['next'][0], join_urls(pithos_settings.BASE_HOST,
140
                                                  self.view_url))
141

    
142
    def test_versions(self):
143
        c = self.cname
144
        o = self.oname
145

    
146
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
147
        r = self.post(self.api_url, content_type='', **meta)
148
        self.assertEqual(r.status_code, 202)
149

    
150
        r = self.get('%s?version=list&format=json' % self.view_url)
151
        self.assertEqual(r.status_code, 200)
152
        l1 = json.loads(r.content)['versions']
153
        self.assertEqual(len(l1), 2)
154

    
155
        # update meta
156
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
157
                'HTTP_X_OBJECT_META_STOCK': 'True'}
158
        r = self.post(self.api_url, content_type='', **meta)
159
        self.assertEqual(r.status_code, 202)
160

    
161
        # assert a newly created version has been created
162
        r = self.get('%s?version=list&format=json' % self.view_url)
163
        self.assertEqual(r.status_code, 200)
164
        l2 = json.loads(r.content)['versions']
165
        self.assertEqual(len(l2), len(l1) + 1)
166
        self.assertEqual(l2[:-1], l1)
167

    
168
        vserial, _ = l2[-2]
169
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
170
                         {'Quality': 'AAA'})
171

    
172
        # update data
173
        self.append_object_data(c, o)
174

    
175
        # assert a newly created version has been created
176
        r = self.get('%s?version=list&format=json' % self.view_url)
177
        self.assertEqual(r.status_code, 200)
178
        l3 = json.loads(r.content)['versions']
179
        self.assertEqual(len(l3), len(l2) + 1)
180
        self.assertEqual(l3[:-1], l2)
181

    
182
    def test_objects_with_trailing_spaces(self):
183
        cname = self.cname
184

    
185
        r = self.get(quote('%s ' % self.view_url))
186
        self.assertEqual(r.status_code, 404)
187

    
188
        # delete object
189
        self.delete(self.api_url)
190

    
191
        r = self.get(self.view_url)
192
        self.assertEqual(r.status_code, 404)
193

    
194
        # upload object with trailing space
195
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
196

    
197
        view_url = join_urls(self.view_path, self.user, cname, oname)
198
        r = self.get(view_url)
199
        self.assertEqual(r.status_code, 200)
200

    
201
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
202
        r = self.get(view_url)
203
        self.assertEqual(r.status_code, 404)
204

    
205
    def test_get_partial(self):
206
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
207
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
208
        self.assertEqual(r.status_code, 206)
209
        self.assertEqual(r.content, self.odata[:limit + 1])
210
        self.assertTrue('Content-Range' in r)
211
        self.assertEqual(r['Content-Range'], 'bytes 0-%d/%d' % (
212
            limit, len(self.odata)))
213
        self.assertTrue('Content-Type' in r)
214
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
215

    
216
    def test_get_range_not_satisfiable(self):
217
        # TODO
218
        #r = self.get(self.view_url, HTTP_RANGE='bytes=50-10')
219
        #self.assertEqual(r.status_code, 416)
220

    
221
        offset = len(self.odata) + 1
222
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
223
        self.assertEqual(r.status_code, 416)
224

    
225
    def test_multiple_range(self):
226
        l = ['0-499', '-500', '1000-']
227
        ranges = 'bytes=%s' % ','.join(l)
228
        r = self.get(self.view_url, HTTP_RANGE=ranges)
229
        self.assertEqual(r.status_code, 206)
230
        self.assertTrue('content-type' in r)
231
        p = re.compile(
232
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
233
            re.I)
234
        m = p.match(r['content-type'])
235
        if m is None:
236
            self.fail('Invalid multiple range content type')
237
        boundary = m.groupdict()['boundary']
238
        cparts = r.content.split('--%s' % boundary)[1:-1]
239

    
240
        # assert content parts length
241
        self.assertEqual(len(cparts), len(l))
242

    
243
        # for each content part assert headers
244
        i = 0
245
        for cpart in cparts:
246
            content = cpart.split('\r\n')
247
            headers = content[1:3]
248
            content_range = headers[0].split(': ')
249
            self.assertEqual(content_range[0], 'Content-Range')
250

    
251
            r = l[i].split('-')
252
            if not r[0] and not r[1]:
253
                pass
254
            elif not r[0]:
255
                start = len(self.odata) - int(r[1])
256
                end = len(self.odata)
257
            elif not r[1]:
258
                start = int(r[0])
259
                end = len(self.odata)
260
            else:
261
                start = int(r[0])
262
                end = int(r[1]) + 1
263
            fdata = self.odata[start:end]
264
            sdata = '\r\n'.join(content[4:-1])
265
            self.assertEqual(len(fdata), len(sdata))
266
            self.assertEquals(fdata, sdata)
267
            i += 1
268

    
269
    def test_multiple_range_not_satisfiable(self):
270
        # perform get with multiple range
271
        out_of_range = len(self.odata) + 1
272
        l = ['0-499', '-500', '%d-' % out_of_range]
273
        ranges = 'bytes=%s' % ','.join(l)
274
        r = self.get(self.view_url, HTTP_RANGE=ranges)
275
        self.assertEqual(r.status_code, 416)
276

    
277
    def test_get_if_match(self):
278
        if pithos_settings.UPDATE_MD5:
279
            etag = md5_hash(self.odata)
280
        else:
281
            etag = merkle(self.odata)
282

    
283
        r = self.get(self.view_url, HTTP_IF_MATCH=etag)
284

    
285
        # assert get success
286
        self.assertEqual(r.status_code, 200)
287

    
288
        # assert response content
289
        self.assertEqual(r.content, self.odata)
290

    
291
    def test_get_if_match_star(self):
292
        r = self.get(self.view_url, HTTP_IF_MATCH='*')
293

    
294
        # assert get success
295
        self.assertEqual(r.status_code, 200)
296

    
297
        # assert response content
298
        self.assertEqual(r.content, self.odata)
299

    
300
    def test_get_multiple_if_match(self):
301
        if pithos_settings.UPDATE_MD5:
302
            etag = md5_hash(self.odata)
303
        else:
304
            etag = merkle(self.odata)
305

    
306
        quoted = lambda s: '"%s"' % s
307
        r = self.get(self.view_url, HTTP_IF_MATCH=','.join(
308
            [quoted(etag), quoted(get_random_data(64))]))
309

    
310
        # assert get success
311
        self.assertEqual(r.status_code, 200)
312

    
313
        # assert response content
314
        self.assertEqual(r.content, self.odata)
315

    
316
    def test_if_match_precondition_failed(self):
317
        r = self.get(self.view_url, HTTP_IF_MATCH=get_random_name())
318
        self.assertEqual(r.status_code, 412)
319

    
320
    def test_if_none_match(self):
321
        if pithos_settings.UPDATE_MD5:
322
            etag = md5_hash(self.odata)
323
        else:
324
            etag = merkle(self.odata)
325

    
326
        # perform get with If-None-Match
327
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
328

    
329
        # assert precondition_failed
330
        self.assertEqual(r.status_code, 304)
331

    
332
        # update object data
333
        r = self.append_object_data(self.cname, self.oname)[-1]
334
        self.assertTrue(etag != r['ETag'])
335

    
336
        # perform get with If-None-Match
337
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
338

    
339
        # assert get success
340
        self.assertEqual(r.status_code, 200)
341

    
342
    def test_if_none_match_star(self):
343
        # perform get with If-None-Match with star
344
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH='*')
345
        self.assertEqual(r.status_code, 304)
346

    
347
    def test_if_modified_since(self):
348
        # upload object
349
        object_info = self.get_object_info(self.cname, self.oname)
350
        last_modified = object_info['Last-Modified']
351
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
352
        t1_formats = map(t1.strftime, DATE_FORMATS)
353

    
354
        # Check not modified since
355
        for t in t1_formats:
356
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
357
            self.assertEqual(r.status_code, 304)
358

    
359
        _time.sleep(1)
360

    
361
        # update object data
362
        appended_data = self.append_object_data(self.cname, self.oname)[1]
363

    
364
        # Check modified since
365
        for t in t1_formats:
366
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
367
            self.assertEqual(r.status_code, 200)
368
            self.assertEqual(r.content, self.odata + appended_data)
369

    
370
    def test_if_modified_since_invalid_date(self):
371
        r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
372
        self.assertEqual(r.status_code, 200)
373
        self.assertEqual(r.content, self.odata)
374

    
375
    def test_if_not_modified_since(self):
376
        object_info = self.get_object_info(self.cname, self.oname)
377
        last_modified = object_info['Last-Modified']
378
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
379

    
380
        # Check unmodified
381
        t1 = t + datetime.timedelta(seconds=1)
382
        t1_formats = map(t1.strftime, DATE_FORMATS)
383
        for t in t1_formats:
384
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
385
            self.assertEqual(r.status_code, 200)
386
            self.assertEqual(r.content, self.odata)
387

    
388
        # modify object
389
        _time.sleep(2)
390
        self.append_object_data(self.cname, self.oname)
391

    
392
        object_info = self.get_object_info(self.cname, self.oname)
393
        last_modified = object_info['Last-Modified']
394
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
395
        t2 = t - datetime.timedelta(seconds=1)
396
        t2_formats = map(t2.strftime, DATE_FORMATS)
397

    
398
        # check modified
399
        for t in t2_formats:
400
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
401
            self.assertEqual(r.status_code, 412)
402

    
403
        # modify account: update object meta
404
        _time.sleep(1)
405
        self.update_object_meta(self.cname, self.oname, {'foo': 'bar'})
406

    
407
        object_info = self.get_object_info(self.cname, self.oname)
408
        last_modified = object_info['Last-Modified']
409
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
410
        t3 = t - datetime.timedelta(seconds=1)
411
        t3_formats = map(t3.strftime, DATE_FORMATS)
412

    
413
        # check modified
414
        for t in t3_formats:
415
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
416
            self.assertEqual(r.status_code, 412)
417

    
418
    def test_if_unmodified_since(self):
419
        object_info = self.get_object_info(self.cname, self.oname)
420
        last_modified = object_info['Last-Modified']
421
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
422
        t = t + datetime.timedelta(seconds=1)
423
        t_formats = map(t.strftime, DATE_FORMATS)
424

    
425
        for tf in t_formats:
426
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
427
            self.assertEqual(r.status_code, 200)
428
            self.assertEqual(r.content, self.odata)
429

    
430
    def test_if_unmodified_since_precondition_failed(self):
431
        object_info = self.get_object_info(self.cname, self.oname)
432
        last_modified = object_info['Last-Modified']
433
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
434
        t = t - datetime.timedelta(seconds=1)
435
        t_formats = map(t.strftime, DATE_FORMATS)
436

    
437
        for tf in t_formats:
438
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
439
            self.assertEqual(r.status_code, 412)
440

    
441
    def test_hashes(self):
442
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
443
        oname, odata = self.upload_object(self.cname, length=l)[:-1]
444
        size = len(odata)
445

    
446
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
447
        r = self.get('%s?format=json&hashmap' % view_url)
448
        self.assertEqual(r.status_code, 200)
449
        body = json.loads(r.content)
450

    
451
        hashes = body['hashes']
452
        block_size = body['block_size']
453
        block_num = size / block_size if size / block_size == 0 else\
454
            size / block_size + 1
455
        self.assertTrue(len(hashes), block_num)
456
        i = 0
457
        for h in hashes:
458
            start = i * block_size
459
            end = (i + 1) * block_size
460
            hash = merkle(odata[start:end])
461
            self.assertEqual(h, hash)
462
            i += 1