Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / views.py @ 6ee6677e

History | View | Annotate | Download (14.8 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api import settings as pithos_settings
38
from pithos.api.test import PithosAPITest, DATE_FORMATS
39
from pithos.api.test.util import (md5_hash, get_random_data, get_random_name)
40
from pithos.api.test.objects import merkle
41

    
42
from synnefo.lib.services import get_service_path
43
from synnefo.lib import join_urls
44

    
45
from mock import patch
46
from urllib import quote
47
from urlparse import urlsplit, parse_qs
48

    
49
import django.utils.simplejson as json
50

    
51
import re
52
import datetime
53
import time as _time
54
import random
55

    
56

    
57
class ObjectGetView(PithosAPITest):
58
    def setUp(self):
59
        PithosAPITest.setUp(self)
60
        self.cname = self.create_container()[0]
61
        self.oname, self.odata = self.upload_object(self.cname)[:-1]
62

    
63
        self.view_path = join_urls(get_service_path(
64
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
65
        self.view_url = join_urls(self.view_path, self.user, self.cname,
66
                                  self.oname)
67
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
68
                                 self.oname)
69

    
70
    def get(self, url, user='user', *args, **kwargs):
71
        with patch("pithos.api.util.get_token_from_cookie") as m:
72
            m.return_value = 'token'
73
            return super(ObjectGetView, self).get(url, user='user', *args,
74
                                                  **kwargs)
75

    
76
    def test_no_cookie_redirect(self):
77
        r = super(ObjectGetView, self).get(self.view_url)
78
        self.assertEqual(r.status_code, 302)
79
        self.assertTrue('Location' in r)
80
        parts = list(urlsplit(r['Location']))
81
        qs = parse_qs(parts[3])
82
        self.assertTrue('next' in qs)
83
        self.assertEqual(qs['next'][0], join_urls(pithos_settings.BASE_HOST,
84
                                                  self.view_url))
85

    
86
    def test_versions(self):
87
        c = self.cname
88
        o = self.oname
89

    
90
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
91
        r = self.post(self.api_url, content_type='', **meta)
92
        self.assertEqual(r.status_code, 202)
93

    
94
        r = self.get('%s?version=list&format=json' % self.view_url)
95
        self.assertEqual(r.status_code, 200)
96
        l1 = json.loads(r.content)['versions']
97
        self.assertEqual(len(l1), 2)
98

    
99
        # update meta
100
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
101
                'HTTP_X_OBJECT_META_STOCK': 'True'}
102
        r = self.post(self.api_url, content_type='', **meta)
103
        self.assertEqual(r.status_code, 202)
104

    
105
        # assert a newly created version has been created
106
        r = self.get('%s?version=list&format=json' % self.view_url)
107
        self.assertEqual(r.status_code, 200)
108
        l2 = json.loads(r.content)['versions']
109
        self.assertEqual(len(l2), len(l1) + 1)
110
        self.assertEqual(l2[:-1], l1)
111

    
112
        vserial, _ = l2[-2]
113
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
114
                         {'Quality': 'AAA'})
115

    
116
        # update data
117
        self.append_object_data(c, o)
118

    
119
        # assert a newly created version has been created
120
        r = self.get('%s?version=list&format=json' % self.view_url)
121
        self.assertEqual(r.status_code, 200)
122
        l3 = json.loads(r.content)['versions']
123
        self.assertEqual(len(l3), len(l2) + 1)
124
        self.assertEqual(l3[:-1], l2)
125

    
126
    def test_objects_with_trailing_spaces(self):
127
        cname = self.cname
128

    
129
        r = self.get(quote('%s ' % self.view_url))
130
        self.assertEqual(r.status_code, 404)
131

    
132
        # delete object
133
        self.delete(self.api_url)
134

    
135
        r = self.get(self.view_url)
136
        self.assertEqual(r.status_code, 404)
137

    
138
        # upload object with trailing space
139
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
140

    
141
        view_url = join_urls(self.view_path, self.user, cname, oname)
142
        r = self.get(view_url)
143
        self.assertEqual(r.status_code, 200)
144

    
145
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
146
        r = self.get(view_url)
147
        self.assertEqual(r.status_code, 404)
148

    
149
    def test_get_partial(self):
150
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
151
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
152
        self.assertEqual(r.status_code, 206)
153
        self.assertEqual(r.content, self.odata[:limit + 1])
154
        self.assertTrue('Content-Range' in r)
155
        self.assertEqual(r['Content-Range'], 'bytes 0-%d/%d' % (
156
            limit, len(self.odata)))
157
        self.assertTrue('Content-Type' in r)
158
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
159

    
160
    def test_get_range_not_satisfiable(self):
161
        # TODO
162
        #r = self.get(self.view_url, HTTP_RANGE='bytes=50-10')
163
        #self.assertEqual(r.status_code, 416)
164

    
165
        offset = len(self.odata) + 1
166
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
167
        self.assertEqual(r.status_code, 416)
168

    
169
    def test_multiple_range(self):
170
        l = ['0-499', '-500', '1000-']
171
        ranges = 'bytes=%s' % ','.join(l)
172
        r = self.get(self.view_url, HTTP_RANGE=ranges)
173
        self.assertEqual(r.status_code, 206)
174
        self.assertTrue('content-type' in r)
175
        p = re.compile(
176
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
177
            re.I)
178
        m = p.match(r['content-type'])
179
        if m is None:
180
            self.fail('Invalid multiple range content type')
181
        boundary = m.groupdict()['boundary']
182
        cparts = r.content.split('--%s' % boundary)[1:-1]
183

    
184
        # assert content parts length
185
        self.assertEqual(len(cparts), len(l))
186

    
187
        # for each content part assert headers
188
        i = 0
189
        for cpart in cparts:
190
            content = cpart.split('\r\n')
191
            headers = content[1:3]
192
            content_range = headers[0].split(': ')
193
            self.assertEqual(content_range[0], 'Content-Range')
194

    
195
            r = l[i].split('-')
196
            if not r[0] and not r[1]:
197
                pass
198
            elif not r[0]:
199
                start = len(self.odata) - int(r[1])
200
                end = len(self.odata)
201
            elif not r[1]:
202
                start = int(r[0])
203
                end = len(self.odata)
204
            else:
205
                start = int(r[0])
206
                end = int(r[1]) + 1
207
            fdata = self.odata[start:end]
208
            sdata = '\r\n'.join(content[4:-1])
209
            self.assertEqual(len(fdata), len(sdata))
210
            self.assertEquals(fdata, sdata)
211
            i += 1
212

    
213
    def test_multiple_range_not_satisfiable(self):
214
        # perform get with multiple range
215
        out_of_range = len(self.odata) + 1
216
        l = ['0-499', '-500', '%d-' % out_of_range]
217
        ranges = 'bytes=%s' % ','.join(l)
218
        r = self.get(self.view_url, HTTP_RANGE=ranges)
219
        self.assertEqual(r.status_code, 416)
220

    
221
    def test_get_if_match(self):
222
        if pithos_settings.UPDATE_MD5:
223
            etag = md5_hash(self.odata)
224
        else:
225
            etag = merkle(self.odata)
226

    
227
        r = self.get(self.view_url, HTTP_IF_MATCH=etag)
228

    
229
        # assert get success
230
        self.assertEqual(r.status_code, 200)
231

    
232
        # assert response content
233
        self.assertEqual(r.content, self.odata)
234

    
235
    def test_get_if_match_star(self):
236
        r = self.get(self.view_url, HTTP_IF_MATCH='*')
237

    
238
        # assert get success
239
        self.assertEqual(r.status_code, 200)
240

    
241
        # assert response content
242
        self.assertEqual(r.content, self.odata)
243

    
244
    def test_get_multiple_if_match(self):
245
        if pithos_settings.UPDATE_MD5:
246
            etag = md5_hash(self.odata)
247
        else:
248
            etag = merkle(self.odata)
249

    
250
        quoted = lambda s: '"%s"' % s
251
        r = self.get(self.view_url, HTTP_IF_MATCH=','.join(
252
            [quoted(etag), quoted(get_random_data(64))]))
253

    
254
        # assert get success
255
        self.assertEqual(r.status_code, 200)
256

    
257
        # assert response content
258
        self.assertEqual(r.content, self.odata)
259

    
260
    def test_if_match_precondition_failed(self):
261
        r = self.get(self.view_url, HTTP_IF_MATCH=get_random_name())
262
        self.assertEqual(r.status_code, 412)
263

    
264
    def test_if_none_match(self):
265
        if pithos_settings.UPDATE_MD5:
266
            etag = md5_hash(self.odata)
267
        else:
268
            etag = merkle(self.odata)
269

    
270
        # perform get with If-None-Match
271
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
272

    
273
        # assert precondition_failed
274
        self.assertEqual(r.status_code, 304)
275

    
276
        # update object data
277
        r = self.append_object_data(self.cname, self.oname)[-1]
278
        self.assertTrue(etag != r['ETag'])
279

    
280
        # perform get with If-None-Match
281
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
282

    
283
        # assert get success
284
        self.assertEqual(r.status_code, 200)
285

    
286
    def test_if_none_match_star(self):
287
        # perform get with If-None-Match with star
288
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH='*')
289
        self.assertEqual(r.status_code, 304)
290

    
291
    def test_if_modified_since(self):
292
        # upload object
293
        object_info = self.get_object_info(self.cname, self.oname)
294
        last_modified = object_info['Last-Modified']
295
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
296
        t1_formats = map(t1.strftime, DATE_FORMATS)
297

    
298
        # Check not modified since
299
        for t in t1_formats:
300
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
301
            self.assertEqual(r.status_code, 304)
302

    
303
        _time.sleep(1)
304

    
305
        # update object data
306
        appended_data = self.append_object_data(self.cname, self.oname)[1]
307

    
308
        # Check modified since
309
        for t in t1_formats:
310
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
311
            self.assertEqual(r.status_code, 200)
312
            self.assertEqual(r.content, self.odata + appended_data)
313

    
314
    def test_if_modified_since_invalid_date(self):
315
        r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
316
        self.assertEqual(r.status_code, 200)
317
        self.assertEqual(r.content, self.odata)
318

    
319
    def test_if_not_modified_since(self):
320
        object_info = self.get_object_info(self.cname, self.oname)
321
        last_modified = object_info['Last-Modified']
322
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
323

    
324
        # Check unmodified
325
        t1 = t + datetime.timedelta(seconds=1)
326
        t1_formats = map(t1.strftime, DATE_FORMATS)
327
        for t in t1_formats:
328
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
329
            self.assertEqual(r.status_code, 200)
330
            self.assertEqual(r.content, self.odata)
331

    
332
        # modify object
333
        _time.sleep(2)
334
        self.append_object_data(self.cname, self.oname)
335

    
336
        object_info = self.get_object_info(self.cname, self.oname)
337
        last_modified = object_info['Last-Modified']
338
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
339
        t2 = t - datetime.timedelta(seconds=1)
340
        t2_formats = map(t2.strftime, DATE_FORMATS)
341

    
342
        # check modified
343
        for t in t2_formats:
344
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
345
            self.assertEqual(r.status_code, 412)
346

    
347
        # modify account: update object meta
348
        _time.sleep(1)
349
        self.update_object_meta(self.cname, self.oname, {'foo': 'bar'})
350

    
351
        object_info = self.get_object_info(self.cname, self.oname)
352
        last_modified = object_info['Last-Modified']
353
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
354
        t3 = t - datetime.timedelta(seconds=1)
355
        t3_formats = map(t3.strftime, DATE_FORMATS)
356

    
357
        # check modified
358
        for t in t3_formats:
359
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
360
            self.assertEqual(r.status_code, 412)
361

    
362
    def test_if_unmodified_since(self):
363
        object_info = self.get_object_info(self.cname, self.oname)
364
        last_modified = object_info['Last-Modified']
365
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
366
        t = t + datetime.timedelta(seconds=1)
367
        t_formats = map(t.strftime, DATE_FORMATS)
368

    
369
        for tf in t_formats:
370
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
371
            self.assertEqual(r.status_code, 200)
372
            self.assertEqual(r.content, self.odata)
373

    
374
    def test_if_unmodified_since_precondition_failed(self):
375
        object_info = self.get_object_info(self.cname, self.oname)
376
        last_modified = object_info['Last-Modified']
377
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
378
        t = t - datetime.timedelta(seconds=1)
379
        t_formats = map(t.strftime, DATE_FORMATS)
380

    
381
        for tf in t_formats:
382
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
383
            self.assertEqual(r.status_code, 412)
384

    
385
    def test_hashes(self):
386
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
387
        oname, odata = self.upload_object(self.cname, length=l)[:-1]
388
        size = len(odata)
389

    
390
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
391
        r = self.get('%s?format=json&hashmap' % view_url)
392
        self.assertEqual(r.status_code, 200)
393
        body = json.loads(r.content)
394

    
395
        hashes = body['hashes']
396
        block_size = body['block_size']
397
        block_num = size / block_size if size / block_size == 0 else\
398
            size / block_size + 1
399
        self.assertTrue(len(hashes), block_num)
400
        i = 0
401
        for h in hashes:
402
            start = i * block_size
403
            end = (i + 1) * block_size
404
            hash = merkle(odata[start:end])
405
            self.assertEqual(h, hash)
406
            i += 1