Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / views.py @ 95b36144

History | View | Annotate | Download (14.8 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api import settings as pithos_settings
38
from pithos.api.test import PithosAPITest, DATE_FORMATS
39
from pithos.api.test.util import (md5_hash, get_random_data, get_random_name)
40
from pithos.api.test.objects import merkle
41

    
42
from synnefo.lib.services import get_service_path
43
from synnefo.lib import join_urls
44

    
45
from mock import patch
46
from urllib import quote
47
from urlparse import urlsplit, parse_qs
48

    
49
import django.utils.simplejson as json
50

    
51
import re
52
import datetime
53
import time as _time
54
import random
55

    
56

    
57
class ObjectGetView(PithosAPITest):
58
    def setUp(self):
59
        PithosAPITest.setUp(self)
60
        self.cname = self.create_container()[0]
61
        self.oname, self.odata = self.upload_object(self.cname)[:-1]
62

    
63
        self.view_path = join_urls(get_service_path(
64
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
65
        self.view_url = join_urls(self.view_path, self.user, self.cname,
66
                                  self.oname)
67
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
68
                                 self.oname)
69

    
70
    def get(self, url, user='user', *args, **kwargs):
71
        with patch("pithos.api.util.get_token_from_cookie") as m:
72
            m.return_value = 'token'
73
            return super(ObjectGetView, self).get(url, user='user', *args,
74
                                                  **kwargs)
75

    
76
    def test_no_cookie_redirect(self):
77
        r = super(ObjectGetView, self).get(self.view_url)
78
        self.assertEqual(r.status_code, 302)
79
        self.assertTrue('Location' in r)
80
        parts = list(urlsplit(r['Location']))
81
        qs = parse_qs(parts[3])
82
        self.assertTrue('next' in qs)
83
        self.assertEqual(qs['next'][0], self.view_url)
84

    
85
    def test_versions(self):
86
        c = self.cname
87
        o = self.oname
88

    
89
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
90
        r = self.post(self.api_url, content_type='', **meta)
91
        self.assertEqual(r.status_code, 202)
92

    
93
        r = self.get('%s?version=list&format=json' % self.view_url)
94
        self.assertEqual(r.status_code, 200)
95
        l1 = json.loads(r.content)['versions']
96
        self.assertEqual(len(l1), 2)
97

    
98
        # update meta
99
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
100
                'HTTP_X_OBJECT_META_STOCK': 'True'}
101
        r = self.post(self.api_url, content_type='', **meta)
102
        self.assertEqual(r.status_code, 202)
103

    
104
        # assert a newly created version has been created
105
        r = self.get('%s?version=list&format=json' % self.view_url)
106
        self.assertEqual(r.status_code, 200)
107
        l2 = json.loads(r.content)['versions']
108
        self.assertEqual(len(l2), len(l1) + 1)
109
        self.assertEqual(l2[:-1], l1)
110

    
111
        vserial, _ = l2[-2]
112
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
113
                         {'X-Object-Meta-Quality': 'AAA'})
114

    
115
        # update data
116
        self.append_object_data(c, o)
117

    
118
        # assert a newly created version has been created
119
        r = self.get('%s?version=list&format=json' % self.view_url)
120
        self.assertEqual(r.status_code, 200)
121
        l3 = json.loads(r.content)['versions']
122
        self.assertEqual(len(l3), len(l2) + 1)
123
        self.assertEqual(l3[:-1], l2)
124

    
125
    def test_objects_with_trailing_spaces(self):
126
        cname = self.cname
127

    
128
        r = self.get(quote('%s ' % self.view_url))
129
        self.assertEqual(r.status_code, 404)
130

    
131
        # delete object
132
        self.delete(self.api_url)
133

    
134
        r = self.get(self.view_url)
135
        self.assertEqual(r.status_code, 404)
136

    
137
        # upload object with trailing space
138
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
139

    
140
        view_url = join_urls(self.view_path, self.user, cname, oname)
141
        r = self.get(view_url)
142
        self.assertEqual(r.status_code, 200)
143

    
144
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
145
        r = self.get(view_url)
146
        self.assertEqual(r.status_code, 404)
147

    
148
    def test_get_partial(self):
149
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
150
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
151
        self.assertEqual(r.status_code, 206)
152
        self.assertEqual(r.content, self.odata[:limit + 1])
153
        self.assertTrue('Content-Range' in r)
154
        self.assertEqual(r['Content-Range'], 'bytes 0-%d/%d' % (
155
            limit, len(self.odata)))
156
        self.assertTrue('Content-Type' in r)
157
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
158

    
159
    def test_get_range_not_satisfiable(self):
160
        # TODO
161
        #r = self.get(self.view_url, HTTP_RANGE='bytes=50-10')
162
        #self.assertEqual(r.status_code, 416)
163

    
164
        offset = len(self.odata) + 1
165
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
166
        self.assertEqual(r.status_code, 416)
167

    
168
    def test_multiple_range(self):
169
        l = ['0-499', '-500', '1000-']
170
        ranges = 'bytes=%s' % ','.join(l)
171
        r = self.get(self.view_url, HTTP_RANGE=ranges)
172
        self.assertEqual(r.status_code, 206)
173
        self.assertTrue('content-type' in r)
174
        p = re.compile(
175
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
176
            re.I)
177
        m = p.match(r['content-type'])
178
        if m is None:
179
            self.fail('Invalid multiple range content type')
180
        boundary = m.groupdict()['boundary']
181
        cparts = r.content.split('--%s' % boundary)[1:-1]
182

    
183
        # assert content parts length
184
        self.assertEqual(len(cparts), len(l))
185

    
186
        # for each content part assert headers
187
        i = 0
188
        for cpart in cparts:
189
            content = cpart.split('\r\n')
190
            headers = content[1:3]
191
            content_range = headers[0].split(': ')
192
            self.assertEqual(content_range[0], 'Content-Range')
193

    
194
            r = l[i].split('-')
195
            if not r[0] and not r[1]:
196
                pass
197
            elif not r[0]:
198
                start = len(self.odata) - int(r[1])
199
                end = len(self.odata)
200
            elif not r[1]:
201
                start = int(r[0])
202
                end = len(self.odata)
203
            else:
204
                start = int(r[0])
205
                end = int(r[1]) + 1
206
            fdata = self.odata[start:end]
207
            sdata = '\r\n'.join(content[4:-1])
208
            self.assertEqual(len(fdata), len(sdata))
209
            self.assertEquals(fdata, sdata)
210
            i += 1
211

    
212
    def test_multiple_range_not_satisfiable(self):
213
        # perform get with multiple range
214
        out_of_range = len(self.odata) + 1
215
        l = ['0-499', '-500', '%d-' % out_of_range]
216
        ranges = 'bytes=%s' % ','.join(l)
217
        r = self.get(self.view_url, HTTP_RANGE=ranges)
218
        self.assertEqual(r.status_code, 416)
219

    
220
    def test_get_if_match(self):
221
        if pithos_settings.UPDATE_MD5:
222
            etag = md5_hash(self.odata)
223
        else:
224
            etag = merkle(self.odata)
225

    
226
        r = self.get(self.view_url, HTTP_IF_MATCH=etag)
227

    
228
        # assert get success
229
        self.assertEqual(r.status_code, 200)
230

    
231
        # assert response content
232
        self.assertEqual(r.content, self.odata)
233

    
234
    def test_get_if_match_star(self):
235
        r = self.get(self.view_url, HTTP_IF_MATCH='*')
236

    
237
        # assert get success
238
        self.assertEqual(r.status_code, 200)
239

    
240
        # assert response content
241
        self.assertEqual(r.content, self.odata)
242

    
243
    def test_get_multiple_if_match(self):
244
        if pithos_settings.UPDATE_MD5:
245
            etag = md5_hash(self.odata)
246
        else:
247
            etag = merkle(self.odata)
248

    
249
        quoted = lambda s: '"%s"' % s
250
        r = self.get(self.view_url, HTTP_IF_MATCH=','.join(
251
            [quoted(etag), quoted(get_random_data(64))]))
252

    
253
        # assert get success
254
        self.assertEqual(r.status_code, 200)
255

    
256
        # assert response content
257
        self.assertEqual(r.content, self.odata)
258

    
259
    def test_if_match_precondition_failed(self):
260
        r = self.get(self.view_url, HTTP_IF_MATCH=get_random_name())
261
        self.assertEqual(r.status_code, 412)
262

    
263
    def test_if_none_match(self):
264
        if pithos_settings.UPDATE_MD5:
265
            etag = md5_hash(self.odata)
266
        else:
267
            etag = merkle(self.odata)
268

    
269
        # perform get with If-None-Match
270
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
271

    
272
        # assert precondition_failed
273
        self.assertEqual(r.status_code, 304)
274

    
275
        # update object data
276
        r = self.append_object_data(self.cname, self.oname)[-1]
277
        self.assertTrue(etag != r['ETag'])
278

    
279
        # perform get with If-None-Match
280
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
281

    
282
        # assert get success
283
        self.assertEqual(r.status_code, 200)
284

    
285
    def test_if_none_match_star(self):
286
        # perform get with If-None-Match with star
287
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH='*')
288
        self.assertEqual(r.status_code, 304)
289

    
290
    def test_if_modified_since(self):
291
        # upload object
292
        object_info = self.get_object_info(self.cname, self.oname)
293
        last_modified = object_info['Last-Modified']
294
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
295
        t1_formats = map(t1.strftime, DATE_FORMATS)
296

    
297
        # Check not modified since
298
        for t in t1_formats:
299
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
300
            self.assertEqual(r.status_code, 304)
301

    
302
        _time.sleep(1)
303

    
304
        # update object data
305
        appended_data = self.append_object_data(self.cname, self.oname)[1]
306

    
307
        # Check modified since
308
        for t in t1_formats:
309
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
310
            self.assertEqual(r.status_code, 200)
311
            self.assertEqual(r.content, self.odata + appended_data)
312

    
313
    def test_if_modified_since_invalid_date(self):
314
        r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
315
        self.assertEqual(r.status_code, 200)
316
        self.assertEqual(r.content, self.odata)
317

    
318
    def test_if_not_modified_since(self):
319
        object_info = self.get_object_info(self.cname, self.oname)
320
        last_modified = object_info['Last-Modified']
321
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
322

    
323
        # Check unmodified
324
        t1 = t + datetime.timedelta(seconds=1)
325
        t1_formats = map(t1.strftime, DATE_FORMATS)
326
        for t in t1_formats:
327
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
328
            self.assertEqual(r.status_code, 200)
329
            self.assertEqual(r.content, self.odata)
330

    
331
        # modify object
332
        _time.sleep(2)
333
        self.append_object_data(self.cname, self.oname)
334

    
335
        object_info = self.get_object_info(self.cname, self.oname)
336
        last_modified = object_info['Last-Modified']
337
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
338
        t2 = t - datetime.timedelta(seconds=1)
339
        t2_formats = map(t2.strftime, DATE_FORMATS)
340

    
341
        # check modified
342
        for t in t2_formats:
343
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
344
            self.assertEqual(r.status_code, 412)
345

    
346
        # modify account: update object meta
347
        _time.sleep(1)
348
        self.update_object_meta(self.cname, self.oname, {'foo': 'bar'})
349

    
350
        object_info = self.get_object_info(self.cname, self.oname)
351
        last_modified = object_info['Last-Modified']
352
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
353
        t3 = t - datetime.timedelta(seconds=1)
354
        t3_formats = map(t3.strftime, DATE_FORMATS)
355

    
356
        # check modified
357
        for t in t3_formats:
358
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
359
            self.assertEqual(r.status_code, 412)
360

    
361
    def test_if_unmodified_since(self):
362
        object_info = self.get_object_info(self.cname, self.oname)
363
        last_modified = object_info['Last-Modified']
364
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
365
        t = t + datetime.timedelta(seconds=1)
366
        t_formats = map(t.strftime, DATE_FORMATS)
367

    
368
        for tf in t_formats:
369
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
370
            self.assertEqual(r.status_code, 200)
371
            self.assertEqual(r.content, self.odata)
372

    
373
    def test_if_unmodified_since_precondition_failed(self):
374
        object_info = self.get_object_info(self.cname, self.oname)
375
        last_modified = object_info['Last-Modified']
376
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
377
        t = t - datetime.timedelta(seconds=1)
378
        t_formats = map(t.strftime, DATE_FORMATS)
379

    
380
        for tf in t_formats:
381
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
382
            self.assertEqual(r.status_code, 412)
383

    
384
    def test_hashes(self):
385
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
386
        oname, odata = self.upload_object(self.cname, length=l)[:-1]
387
        size = len(odata)
388

    
389
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
390
        r = self.get('%s?format=json&hashmap' % view_url)
391
        self.assertEqual(r.status_code, 200)
392
        body = json.loads(r.content)
393

    
394
        hashes = body['hashes']
395
        block_size = body['block_size']
396
        block_num = size / block_size if size / block_size == 0 else\
397
            size / block_size + 1
398
        self.assertTrue(len(hashes), block_num)
399
        i = 0
400
        for h in hashes:
401
            start = i * block_size
402
            end = (i + 1) * block_size
403
            hash = merkle(odata[start:end])
404
            self.assertEqual(h, hash)
405
            i += 1