Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / views.py @ 381a548c

History | View | Annotate | Download (17.4 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api import settings as pithos_settings
38
from pithos.api.test import PithosAPITest, DATE_FORMATS
39
from pithos.api.test.util import (md5_hash, get_random_data, get_random_name)
40
from pithos.api.test.objects import merkle
41

    
42
from synnefo.lib.services import get_service_path
43
from synnefo.lib import join_urls
44

    
45
from mock import patch
46
from urllib import quote
47
from urlparse import urlsplit, parse_qs
48

    
49
import django.utils.simplejson as json
50

    
51
import re
52
import datetime
53
import time as _time
54
import random
55

    
56

    
57
class NotAllowedView(PithosAPITest):
58
    def head(self, url, *args, **kwargs):
59
        with patch("pithos.api.util.get_token_from_cookie") as m:
60
            m.return_value = 'token'
61
            return super(NotAllowedView, self).head(url, *args, **kwargs)
62

    
63
    def delete(self, url, *args, **kwargs):
64
        with patch("pithos.api.util.get_token_from_cookie") as m:
65
            m.return_value = 'token'
66
            return super(NotAllowedView, self).delete(url, *args, **kwargs)
67

    
68
    def post(self, url, *args, **kwargs):
69
        with patch("pithos.api.util.get_token_from_cookie") as m:
70
            m.return_value = 'token'
71
            return super(NotAllowedView, self).post(url, *args, **kwargs)
72

    
73
    def put(self, url, *args, **kwargs):
74
        with patch("pithos.api.util.get_token_from_cookie") as m:
75
            m.return_value = 'token'
76
            return super(NotAllowedView, self).put(url, *args, **kwargs)
77

    
78
    def copy(self, url, *args, **kwargs):
79
        with patch("pithos.api.util.get_token_from_cookie") as m:
80
            m.return_value = 'token'
81
            return super(NotAllowedView, self).copy(url, *args, **kwargs)
82

    
83
    def move(self, url, *args, **kwargs):
84
        with patch("pithos.api.util.get_token_from_cookie") as m:
85
            m.return_value = 'token'
86
            return super(NotAllowedView, self).move(url, *args, **kwargs)
87

    
88
    def test_not_allowed(self):
89
        self.view_path = join_urls(get_service_path(
90
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
91
        self.view_url = join_urls(self.view_path, self.user, get_random_name(),
92
                                  get_random_name())
93

    
94
        r = self.head(self.view_url)
95
        self.assertEqual(r.status_code, 405)
96
        self.assertTrue('Allow' in r)
97
        self.assertEqual(r['Allow'],  'GET')
98

    
99
        r = self.delete(self.view_url)
100
        self.assertEqual(r.status_code, 405)
101
        self.assertTrue('Allow' in r)
102
        self.assertEqual(r['Allow'],  'GET')
103

    
104
        r = self.post(self.view_url)
105
        self.assertEqual(r.status_code, 405)
106
        self.assertTrue('Allow' in r)
107
        self.assertEqual(r['Allow'],  'GET')
108

    
109
        r = self.put(self.view_url)
110
        self.assertEqual(r.status_code, 405)
111
        self.assertTrue('Allow' in r)
112
        self.assertEqual(r['Allow'],  'GET')
113

    
114
        r = self.copy(self.view_url)
115
        self.assertEqual(r.status_code, 405)
116
        self.assertTrue('Allow' in r)
117
        self.assertEqual(r['Allow'],  'GET')
118

    
119
        r = self.move(self.view_url)
120
        self.assertEqual(r.status_code, 405)
121
        self.assertTrue('Allow' in r)
122
        self.assertEqual(r['Allow'],  'GET')
123

    
124

    
125
class ObjectGetView(PithosAPITest):
126
    def setUp(self):
127
        PithosAPITest.setUp(self)
128
        self.cname = self.create_container()[0]
129
        self.oname, self.odata = self.upload_object(self.cname)[:-1]
130

    
131
        self.view_path = join_urls(get_service_path(
132
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
133
        self.view_url = join_urls(self.view_path, self.user, self.cname,
134
                                  self.oname)
135
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
136
                                 self.oname)
137

    
138
    def get(self, url, user='user', *args, **kwargs):
139
        with patch("pithos.api.util.get_token_from_cookie") as m:
140
            m.return_value = 'token'
141
            return super(ObjectGetView, self).get(url, user='user', *args,
142
                                                  **kwargs)
143

    
144
    def test_no_cookie_redirect(self):
145
        r = super(ObjectGetView, self).get(self.view_url)
146
        self.assertEqual(r.status_code, 302)
147
        self.assertTrue('Location' in r)
148
        parts = list(urlsplit(r['Location']))
149
        qs = parse_qs(parts[3])
150
        self.assertTrue('next' in qs)
151
        self.assertEqual(qs['next'][0], join_urls(pithos_settings.BASE_HOST,
152
                                                  self.view_url))
153

    
154
    def test_versions(self):
155
        c = self.cname
156
        o = self.oname
157

    
158
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
159
        r = self.post(self.api_url, content_type='', **meta)
160
        self.assertEqual(r.status_code, 202)
161

    
162
        r = self.get('%s?version=list&format=json' % self.view_url)
163
        self.assertEqual(r.status_code, 200)
164
        l1 = json.loads(r.content)['versions']
165
        self.assertEqual(len(l1), 2)
166

    
167
        # update meta
168
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
169
                'HTTP_X_OBJECT_META_STOCK': 'True'}
170
        r = self.post(self.api_url, content_type='', **meta)
171
        self.assertEqual(r.status_code, 202)
172

    
173
        # assert a newly created version has been created
174
        r = self.get('%s?version=list&format=json' % self.view_url)
175
        self.assertEqual(r.status_code, 200)
176
        l2 = json.loads(r.content)['versions']
177
        self.assertEqual(len(l2), len(l1) + 1)
178
        self.assertEqual(l2[:-1], l1)
179

    
180
        vserial, _ = l2[-2]
181
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
182
                         {'Quality': 'AAA'})
183

    
184
        # update data
185
        self.append_object_data(c, o)
186

    
187
        # assert a newly created version has been created
188
        r = self.get('%s?version=list&format=json' % self.view_url)
189
        self.assertEqual(r.status_code, 200)
190
        l3 = json.loads(r.content)['versions']
191
        self.assertEqual(len(l3), len(l2) + 1)
192
        self.assertEqual(l3[:-1], l2)
193

    
194
    def test_objects_with_trailing_spaces(self):
195
        cname = self.cname
196

    
197
        r = self.get(quote('%s ' % self.view_url))
198
        self.assertEqual(r.status_code, 404)
199

    
200
        # delete object
201
        self.delete(self.api_url)
202

    
203
        r = self.get(self.view_url)
204
        self.assertEqual(r.status_code, 404)
205

    
206
        # upload object with trailing space
207
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
208

    
209
        view_url = join_urls(self.view_path, self.user, cname, oname)
210
        r = self.get(view_url)
211
        self.assertEqual(r.status_code, 200)
212

    
213
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
214
        r = self.get(view_url)
215
        self.assertEqual(r.status_code, 404)
216

    
217
    def test_get_partial(self):
218
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
219
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
220
        self.assertEqual(r.status_code, 206)
221
        self.assertEqual(r.content, self.odata[:limit + 1])
222
        self.assertTrue('Content-Range' in r)
223
        self.assertEqual(r['Content-Range'], 'bytes 0-%d/%d' % (
224
            limit, len(self.odata)))
225
        self.assertTrue('Content-Type' in r)
226
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
227

    
228
    def test_get_range_not_satisfiable(self):
229
        # TODO
230
        #r = self.get(self.view_url, HTTP_RANGE='bytes=50-10')
231
        #self.assertEqual(r.status_code, 416)
232

    
233
        offset = len(self.odata) + 1
234
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
235
        self.assertEqual(r.status_code, 416)
236

    
237
    def test_multiple_range(self):
238
        l = ['0-499', '-500', '1000-']
239
        ranges = 'bytes=%s' % ','.join(l)
240
        r = self.get(self.view_url, HTTP_RANGE=ranges)
241
        self.assertEqual(r.status_code, 206)
242
        self.assertTrue('content-type' in r)
243
        p = re.compile(
244
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
245
            re.I)
246
        m = p.match(r['content-type'])
247
        if m is None:
248
            self.fail('Invalid multiple range content type')
249
        boundary = m.groupdict()['boundary']
250
        cparts = r.content.split('--%s' % boundary)[1:-1]
251

    
252
        # assert content parts length
253
        self.assertEqual(len(cparts), len(l))
254

    
255
        # for each content part assert headers
256
        i = 0
257
        for cpart in cparts:
258
            content = cpart.split('\r\n')
259
            headers = content[1:3]
260
            content_range = headers[0].split(': ')
261
            self.assertEqual(content_range[0], 'Content-Range')
262

    
263
            r = l[i].split('-')
264
            if not r[0] and not r[1]:
265
                pass
266
            elif not r[0]:
267
                start = len(self.odata) - int(r[1])
268
                end = len(self.odata)
269
            elif not r[1]:
270
                start = int(r[0])
271
                end = len(self.odata)
272
            else:
273
                start = int(r[0])
274
                end = int(r[1]) + 1
275
            fdata = self.odata[start:end]
276
            sdata = '\r\n'.join(content[4:-1])
277
            self.assertEqual(len(fdata), len(sdata))
278
            self.assertEquals(fdata, sdata)
279
            i += 1
280

    
281
    def test_multiple_range_not_satisfiable(self):
282
        # perform get with multiple range
283
        out_of_range = len(self.odata) + 1
284
        l = ['0-499', '-500', '%d-' % out_of_range]
285
        ranges = 'bytes=%s' % ','.join(l)
286
        r = self.get(self.view_url, HTTP_RANGE=ranges)
287
        self.assertEqual(r.status_code, 416)
288

    
289
    def test_get_if_match(self):
290
        if pithos_settings.UPDATE_MD5:
291
            etag = md5_hash(self.odata)
292
        else:
293
            etag = merkle(self.odata)
294

    
295
        r = self.get(self.view_url, HTTP_IF_MATCH=etag)
296

    
297
        # assert get success
298
        self.assertEqual(r.status_code, 200)
299

    
300
        # assert response content
301
        self.assertEqual(r.content, self.odata)
302

    
303
    def test_get_if_match_star(self):
304
        r = self.get(self.view_url, HTTP_IF_MATCH='*')
305

    
306
        # assert get success
307
        self.assertEqual(r.status_code, 200)
308

    
309
        # assert response content
310
        self.assertEqual(r.content, self.odata)
311

    
312
    def test_get_multiple_if_match(self):
313
        if pithos_settings.UPDATE_MD5:
314
            etag = md5_hash(self.odata)
315
        else:
316
            etag = merkle(self.odata)
317

    
318
        quoted = lambda s: '"%s"' % s
319
        r = self.get(self.view_url, HTTP_IF_MATCH=','.join(
320
            [quoted(etag), quoted(get_random_data(64))]))
321

    
322
        # assert get success
323
        self.assertEqual(r.status_code, 200)
324

    
325
        # assert response content
326
        self.assertEqual(r.content, self.odata)
327

    
328
    def test_if_match_precondition_failed(self):
329
        r = self.get(self.view_url, HTTP_IF_MATCH=get_random_name())
330
        self.assertEqual(r.status_code, 412)
331

    
332
    def test_if_none_match(self):
333
        if pithos_settings.UPDATE_MD5:
334
            etag = md5_hash(self.odata)
335
        else:
336
            etag = merkle(self.odata)
337

    
338
        # perform get with If-None-Match
339
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
340

    
341
        # assert precondition_failed
342
        self.assertEqual(r.status_code, 304)
343

    
344
        # update object data
345
        r = self.append_object_data(self.cname, self.oname)[-1]
346
        self.assertTrue(etag != r['ETag'])
347

    
348
        # perform get with If-None-Match
349
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
350

    
351
        # assert get success
352
        self.assertEqual(r.status_code, 200)
353

    
354
    def test_if_none_match_star(self):
355
        # perform get with If-None-Match with star
356
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH='*')
357
        self.assertEqual(r.status_code, 304)
358

    
359
    def test_if_modified_since(self):
360
        # upload object
361
        object_info = self.get_object_info(self.cname, self.oname)
362
        last_modified = object_info['Last-Modified']
363
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
364
        t1_formats = map(t1.strftime, DATE_FORMATS)
365

    
366
        # Check not modified since
367
        for t in t1_formats:
368
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
369
            self.assertEqual(r.status_code, 304)
370

    
371
        _time.sleep(1)
372

    
373
        # update object data
374
        appended_data = self.append_object_data(self.cname, self.oname)[1]
375

    
376
        # Check modified since
377
        for t in t1_formats:
378
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
379
            self.assertEqual(r.status_code, 200)
380
            self.assertEqual(r.content, self.odata + appended_data)
381

    
382
    def test_if_modified_since_invalid_date(self):
383
        r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
384
        self.assertEqual(r.status_code, 200)
385
        self.assertEqual(r.content, self.odata)
386

    
387
    def test_if_not_modified_since(self):
388
        object_info = self.get_object_info(self.cname, self.oname)
389
        last_modified = object_info['Last-Modified']
390
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
391

    
392
        # Check unmodified
393
        t1 = t + datetime.timedelta(seconds=1)
394
        t1_formats = map(t1.strftime, DATE_FORMATS)
395
        for t in t1_formats:
396
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
397
            self.assertEqual(r.status_code, 200)
398
            self.assertEqual(r.content, self.odata)
399

    
400
        # modify object
401
        _time.sleep(2)
402
        self.append_object_data(self.cname, self.oname)
403

    
404
        object_info = self.get_object_info(self.cname, self.oname)
405
        last_modified = object_info['Last-Modified']
406
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
407
        t2 = t - datetime.timedelta(seconds=1)
408
        t2_formats = map(t2.strftime, DATE_FORMATS)
409

    
410
        # check modified
411
        for t in t2_formats:
412
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
413
            self.assertEqual(r.status_code, 412)
414

    
415
        # modify account: update object meta
416
        _time.sleep(1)
417
        self.update_object_meta(self.cname, self.oname, {'foo': 'bar'})
418

    
419
        object_info = self.get_object_info(self.cname, self.oname)
420
        last_modified = object_info['Last-Modified']
421
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
422
        t3 = t - datetime.timedelta(seconds=1)
423
        t3_formats = map(t3.strftime, DATE_FORMATS)
424

    
425
        # check modified
426
        for t in t3_formats:
427
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
428
            self.assertEqual(r.status_code, 412)
429

    
430
    def test_if_unmodified_since(self):
431
        object_info = self.get_object_info(self.cname, self.oname)
432
        last_modified = object_info['Last-Modified']
433
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
434
        t = t + datetime.timedelta(seconds=1)
435
        t_formats = map(t.strftime, DATE_FORMATS)
436

    
437
        for tf in t_formats:
438
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
439
            self.assertEqual(r.status_code, 200)
440
            self.assertEqual(r.content, self.odata)
441

    
442
    def test_if_unmodified_since_precondition_failed(self):
443
        object_info = self.get_object_info(self.cname, self.oname)
444
        last_modified = object_info['Last-Modified']
445
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
446
        t = t - datetime.timedelta(seconds=1)
447
        t_formats = map(t.strftime, DATE_FORMATS)
448

    
449
        for tf in t_formats:
450
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
451
            self.assertEqual(r.status_code, 412)
452

    
453
    def test_hashes(self):
454
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
455
        oname, odata = self.upload_object(self.cname, length=l)[:-1]
456
        size = len(odata)
457

    
458
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
459
        r = self.get('%s?format=json&hashmap' % view_url)
460
        self.assertEqual(r.status_code, 200)
461
        body = json.loads(r.content)
462

    
463
        hashes = body['hashes']
464
        block_size = body['block_size']
465
        block_num = size / block_size if size / block_size == 0 else\
466
            size / block_size + 1
467
        self.assertTrue(len(hashes), block_num)
468
        i = 0
469
        for h in hashes:
470
            start = i * block_size
471
            end = (i + 1) * block_size
472
            hash = merkle(odata[start:end])
473
            self.assertEqual(h, hash)
474
            i += 1