Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / views.py @ 1e47e49d

History | View | Annotate | Download (18 kB)

1 95b36144 Sofia Papagiannaki
#!/usr/bin/env python
2 95b36144 Sofia Papagiannaki
#coding=utf8
3 95b36144 Sofia Papagiannaki
4 95b36144 Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 95b36144 Sofia Papagiannaki
#
6 95b36144 Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 95b36144 Sofia Papagiannaki
# without modification, are permitted provided that the following
8 95b36144 Sofia Papagiannaki
# conditions are met:
9 95b36144 Sofia Papagiannaki
#
10 95b36144 Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 95b36144 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 95b36144 Sofia Papagiannaki
#      disclaimer.
13 95b36144 Sofia Papagiannaki
#
14 95b36144 Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 95b36144 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 95b36144 Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 95b36144 Sofia Papagiannaki
#      provided with the distribution.
18 95b36144 Sofia Papagiannaki
#
19 95b36144 Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 95b36144 Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 95b36144 Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 95b36144 Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 95b36144 Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 95b36144 Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 95b36144 Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 95b36144 Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 95b36144 Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 95b36144 Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 95b36144 Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 95b36144 Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 95b36144 Sofia Papagiannaki
#
32 95b36144 Sofia Papagiannaki
# The views and conclusions contained in the software and
33 95b36144 Sofia Papagiannaki
# documentation are those of the authors and should not be
34 95b36144 Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 95b36144 Sofia Papagiannaki
# or implied, of GRNET S.A.
36 95b36144 Sofia Papagiannaki
37 95b36144 Sofia Papagiannaki
from pithos.api import settings as pithos_settings
38 95b36144 Sofia Papagiannaki
from pithos.api.test import PithosAPITest, DATE_FORMATS
39 95b36144 Sofia Papagiannaki
from pithos.api.test.util import (md5_hash, get_random_data, get_random_name)
40 95b36144 Sofia Papagiannaki
from pithos.api.test.objects import merkle
41 95b36144 Sofia Papagiannaki
42 95b36144 Sofia Papagiannaki
from synnefo.lib.services import get_service_path
43 95b36144 Sofia Papagiannaki
from synnefo.lib import join_urls
44 95b36144 Sofia Papagiannaki
45 13bf6cd8 Sofia Papagiannaki
#from mock import patch
46 95b36144 Sofia Papagiannaki
from urllib import quote
47 95b36144 Sofia Papagiannaki
48 95b36144 Sofia Papagiannaki
import django.utils.simplejson as json
49 95b36144 Sofia Papagiannaki
50 95b36144 Sofia Papagiannaki
import re
51 95b36144 Sofia Papagiannaki
import datetime
52 95b36144 Sofia Papagiannaki
import time as _time
53 95b36144 Sofia Papagiannaki
import random
54 13bf6cd8 Sofia Papagiannaki
import urllib
55 13bf6cd8 Sofia Papagiannaki
import urlparse
56 95b36144 Sofia Papagiannaki
57 95b36144 Sofia Papagiannaki
58 13bf6cd8 Sofia Papagiannaki
def add_url_params(url, **kwargs):
59 13bf6cd8 Sofia Papagiannaki
    if not kwargs:
60 13bf6cd8 Sofia Papagiannaki
        return url
61 13bf6cd8 Sofia Papagiannaki
    parts = list(urlparse.urlsplit(url))
62 13bf6cd8 Sofia Papagiannaki
    params = dict(urlparse.parse_qsl(parts[3], keep_blank_values=True))
63 13bf6cd8 Sofia Papagiannaki
    params.update(kwargs)
64 13bf6cd8 Sofia Papagiannaki
    parts[3] = urllib.urlencode(params)
65 13bf6cd8 Sofia Papagiannaki
    return urlparse.urlunsplit(parts)
66 13bf6cd8 Sofia Papagiannaki
67 3dce76b5 Sofia Papagiannaki
68 13bf6cd8 Sofia Papagiannaki
class NotAllowedView(PithosAPITest):
69 3dce76b5 Sofia Papagiannaki
    def test_not_allowed(self):
70 3dce76b5 Sofia Papagiannaki
        self.view_path = join_urls(get_service_path(
71 3dce76b5 Sofia Papagiannaki
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
72 3dce76b5 Sofia Papagiannaki
        self.view_url = join_urls(self.view_path, self.user, get_random_name(),
73 3dce76b5 Sofia Papagiannaki
                                  get_random_name())
74 3dce76b5 Sofia Papagiannaki
75 3dce76b5 Sofia Papagiannaki
        r = self.delete(self.view_url)
76 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
77 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
78 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
79 3dce76b5 Sofia Papagiannaki
80 3dce76b5 Sofia Papagiannaki
        r = self.post(self.view_url)
81 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
82 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
83 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
84 3dce76b5 Sofia Papagiannaki
85 3dce76b5 Sofia Papagiannaki
        r = self.put(self.view_url)
86 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
87 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
88 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
89 3dce76b5 Sofia Papagiannaki
90 3dce76b5 Sofia Papagiannaki
        r = self.copy(self.view_url)
91 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
92 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
93 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
94 3dce76b5 Sofia Papagiannaki
95 3dce76b5 Sofia Papagiannaki
        r = self.move(self.view_url)
96 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
97 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
98 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
99 3dce76b5 Sofia Papagiannaki
100 3dce76b5 Sofia Papagiannaki
101 95b36144 Sofia Papagiannaki
class ObjectGetView(PithosAPITest):
102 95b36144 Sofia Papagiannaki
    def setUp(self):
103 95b36144 Sofia Papagiannaki
        PithosAPITest.setUp(self)
104 95b36144 Sofia Papagiannaki
        self.cname = self.create_container()[0]
105 95b36144 Sofia Papagiannaki
        self.oname, self.odata = self.upload_object(self.cname)[:-1]
106 95b36144 Sofia Papagiannaki
107 95b36144 Sofia Papagiannaki
        self.view_path = join_urls(get_service_path(
108 95b36144 Sofia Papagiannaki
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
109 95b36144 Sofia Papagiannaki
        self.view_url = join_urls(self.view_path, self.user, self.cname,
110 95b36144 Sofia Papagiannaki
                                  self.oname)
111 95b36144 Sofia Papagiannaki
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
112 95b36144 Sofia Papagiannaki
                                 self.oname)
113 95b36144 Sofia Papagiannaki
114 13bf6cd8 Sofia Papagiannaki
    def view(self, url, access_token='valid_token', user='user', *args,
115 13bf6cd8 Sofia Papagiannaki
             **kwargs):
116 13bf6cd8 Sofia Papagiannaki
117 13bf6cd8 Sofia Papagiannaki
        params = {}
118 13bf6cd8 Sofia Papagiannaki
        if access_token is not None:
119 13bf6cd8 Sofia Papagiannaki
            params['access_token'] = access_token
120 13bf6cd8 Sofia Papagiannaki
        return self.get(add_url_params(url, **params), user, *args,
121 13bf6cd8 Sofia Papagiannaki
                        **kwargs)
122 95b36144 Sofia Papagiannaki
123 13bf6cd8 Sofia Papagiannaki
    def test_no_authorization_granted(self):
124 13bf6cd8 Sofia Papagiannaki
        r = self.get(self.view_url)
125 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 302)
126 95b36144 Sofia Papagiannaki
        self.assertTrue('Location' in r)
127 13bf6cd8 Sofia Papagiannaki
        p = urlparse.urlparse(r['Location'])
128 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(p.netloc, 'testserver')
129 f4f948c0 Sofia Papagiannaki
        self.assertEqual(p.path, '/astakos/oauth2/auth')
130 13bf6cd8 Sofia Papagiannaki
131 13bf6cd8 Sofia Papagiannaki
        r = self.get(add_url_params(self.view_url, code='valid_code'),
132 13bf6cd8 Sofia Papagiannaki
                     follow=True)
133 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
134 13bf6cd8 Sofia Papagiannaki
        self.assertTrue(r.content, self.odata)
135 13bf6cd8 Sofia Papagiannaki
        intermidiate_url = r.redirect_chain[0][0]
136 13bf6cd8 Sofia Papagiannaki
        p = urlparse.urlparse(intermidiate_url)
137 13bf6cd8 Sofia Papagiannaki
        params = urlparse.parse_qs(p.query)
138 13bf6cd8 Sofia Papagiannaki
        self.assertTrue('access_token' in params)
139 13bf6cd8 Sofia Papagiannaki
140 13bf6cd8 Sofia Papagiannaki
        r = self.get(add_url_params(self.view_url, access_token='valid_token'))
141 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
142 13bf6cd8 Sofia Papagiannaki
        self.assertTrue(r.content, self.odata)
143 13bf6cd8 Sofia Papagiannaki
144 13bf6cd8 Sofia Papagiannaki
    def test_forbidden(self):
145 13bf6cd8 Sofia Papagiannaki
        container = self.create_container(user='alice')[0]
146 13bf6cd8 Sofia Papagiannaki
        obj = self.upload_object(container, user='alice')[0]
147 13bf6cd8 Sofia Papagiannaki
148 13bf6cd8 Sofia Papagiannaki
        url = join_urls(self.view_path, 'alice', container, obj)
149 13bf6cd8 Sofia Papagiannaki
        r = self.view(url)
150 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
151 13bf6cd8 Sofia Papagiannaki
152 13bf6cd8 Sofia Papagiannaki
    def test_shared_with_me(self):
153 13bf6cd8 Sofia Papagiannaki
        container = self.create_container(user='alice')[0]
154 13bf6cd8 Sofia Papagiannaki
        obj, data = self.upload_object(container, user='alice')[:-1]
155 13bf6cd8 Sofia Papagiannaki
156 13bf6cd8 Sofia Papagiannaki
        # share object
157 13bf6cd8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', container, obj)
158 13bf6cd8 Sofia Papagiannaki
        self.post(url, user='alice', content_type='',
159 13bf6cd8 Sofia Papagiannaki
                  HTTP_CONTENT_RANGE='bytes */*',
160 13bf6cd8 Sofia Papagiannaki
                  HTTP_X_OBJECT_SHARING='read=user')
161 13bf6cd8 Sofia Papagiannaki
162 13bf6cd8 Sofia Papagiannaki
        url = join_urls(self.view_path, 'alice', container, obj)
163 13bf6cd8 Sofia Papagiannaki
        r = self.view(url)
164 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
165 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.content, data)
166 13bf6cd8 Sofia Papagiannaki
167 13bf6cd8 Sofia Papagiannaki
    def test_view(self):
168 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url)
169 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
170 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
171 13bf6cd8 Sofia Papagiannaki
172 13bf6cd8 Sofia Papagiannaki
    def test_not_existing(self):
173 13bf6cd8 Sofia Papagiannaki
        url = self.view_url[:-1]
174 13bf6cd8 Sofia Papagiannaki
        r = self.view(url)
175 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
176 95b36144 Sofia Papagiannaki
177 95b36144 Sofia Papagiannaki
    def test_versions(self):
178 95b36144 Sofia Papagiannaki
        c = self.cname
179 95b36144 Sofia Papagiannaki
        o = self.oname
180 95b36144 Sofia Papagiannaki
181 95b36144 Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
182 95b36144 Sofia Papagiannaki
        r = self.post(self.api_url, content_type='', **meta)
183 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
184 95b36144 Sofia Papagiannaki
185 13bf6cd8 Sofia Papagiannaki
        r = self.view('%s?version=list&format=json' % self.view_url)
186 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
187 95b36144 Sofia Papagiannaki
        l1 = json.loads(r.content)['versions']
188 95b36144 Sofia Papagiannaki
        self.assertEqual(len(l1), 2)
189 95b36144 Sofia Papagiannaki
190 95b36144 Sofia Papagiannaki
        # update meta
191 95b36144 Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
192 95b36144 Sofia Papagiannaki
                'HTTP_X_OBJECT_META_STOCK': 'True'}
193 95b36144 Sofia Papagiannaki
        r = self.post(self.api_url, content_type='', **meta)
194 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
195 95b36144 Sofia Papagiannaki
196 95b36144 Sofia Papagiannaki
        # assert a newly created version has been created
197 13bf6cd8 Sofia Papagiannaki
        r = self.view('%s?version=list&format=json' % self.view_url)
198 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
199 95b36144 Sofia Papagiannaki
        l2 = json.loads(r.content)['versions']
200 95b36144 Sofia Papagiannaki
        self.assertEqual(len(l2), len(l1) + 1)
201 95b36144 Sofia Papagiannaki
        self.assertEqual(l2[:-1], l1)
202 95b36144 Sofia Papagiannaki
203 95b36144 Sofia Papagiannaki
        vserial, _ = l2[-2]
204 95b36144 Sofia Papagiannaki
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
205 6ee6677e Sofia Papagiannaki
                         {'Quality': 'AAA'})
206 95b36144 Sofia Papagiannaki
207 95b36144 Sofia Papagiannaki
        # update data
208 95b36144 Sofia Papagiannaki
        self.append_object_data(c, o)
209 95b36144 Sofia Papagiannaki
210 95b36144 Sofia Papagiannaki
        # assert a newly created version has been created
211 13bf6cd8 Sofia Papagiannaki
        r = self.view('%s?version=list&format=json' % self.view_url)
212 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
213 95b36144 Sofia Papagiannaki
        l3 = json.loads(r.content)['versions']
214 95b36144 Sofia Papagiannaki
        self.assertEqual(len(l3), len(l2) + 1)
215 95b36144 Sofia Papagiannaki
        self.assertEqual(l3[:-1], l2)
216 95b36144 Sofia Papagiannaki
217 95b36144 Sofia Papagiannaki
    def test_objects_with_trailing_spaces(self):
218 95b36144 Sofia Papagiannaki
        cname = self.cname
219 95b36144 Sofia Papagiannaki
220 13bf6cd8 Sofia Papagiannaki
        r = self.view(quote('%s ' % self.view_url))
221 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
222 95b36144 Sofia Papagiannaki
223 95b36144 Sofia Papagiannaki
        # delete object
224 95b36144 Sofia Papagiannaki
        self.delete(self.api_url)
225 95b36144 Sofia Papagiannaki
226 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url)
227 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
228 95b36144 Sofia Papagiannaki
229 95b36144 Sofia Papagiannaki
        # upload object with trailing space
230 95b36144 Sofia Papagiannaki
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
231 95b36144 Sofia Papagiannaki
232 95b36144 Sofia Papagiannaki
        view_url = join_urls(self.view_path, self.user, cname, oname)
233 13bf6cd8 Sofia Papagiannaki
        r = self.view(view_url)
234 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
235 95b36144 Sofia Papagiannaki
236 95b36144 Sofia Papagiannaki
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
237 13bf6cd8 Sofia Papagiannaki
        r = self.view(view_url)
238 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
239 95b36144 Sofia Papagiannaki
240 95b36144 Sofia Papagiannaki
    def test_get_partial(self):
241 95b36144 Sofia Papagiannaki
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
242 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
243 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
244 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata[:limit + 1])
245 95b36144 Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
246 95b36144 Sofia Papagiannaki
        self.assertEqual(r['Content-Range'], 'bytes 0-%d/%d' % (
247 95b36144 Sofia Papagiannaki
            limit, len(self.odata)))
248 95b36144 Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
249 95b36144 Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
250 95b36144 Sofia Papagiannaki
251 95b36144 Sofia Papagiannaki
    def test_get_range_not_satisfiable(self):
252 95b36144 Sofia Papagiannaki
        # TODO
253 13bf6cd8 Sofia Papagiannaki
        #r = self.view(self.view_url, HTTP_RANGE='bytes=50-10')
254 95b36144 Sofia Papagiannaki
        #self.assertEqual(r.status_code, 416)
255 95b36144 Sofia Papagiannaki
256 95b36144 Sofia Papagiannaki
        offset = len(self.odata) + 1
257 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
258 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
259 95b36144 Sofia Papagiannaki
260 95b36144 Sofia Papagiannaki
    def test_multiple_range(self):
261 95b36144 Sofia Papagiannaki
        l = ['0-499', '-500', '1000-']
262 95b36144 Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
263 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_RANGE=ranges)
264 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
265 95b36144 Sofia Papagiannaki
        self.assertTrue('content-type' in r)
266 95b36144 Sofia Papagiannaki
        p = re.compile(
267 95b36144 Sofia Papagiannaki
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
268 95b36144 Sofia Papagiannaki
            re.I)
269 95b36144 Sofia Papagiannaki
        m = p.match(r['content-type'])
270 95b36144 Sofia Papagiannaki
        if m is None:
271 95b36144 Sofia Papagiannaki
            self.fail('Invalid multiple range content type')
272 95b36144 Sofia Papagiannaki
        boundary = m.groupdict()['boundary']
273 95b36144 Sofia Papagiannaki
        cparts = r.content.split('--%s' % boundary)[1:-1]
274 95b36144 Sofia Papagiannaki
275 95b36144 Sofia Papagiannaki
        # assert content parts length
276 95b36144 Sofia Papagiannaki
        self.assertEqual(len(cparts), len(l))
277 95b36144 Sofia Papagiannaki
278 95b36144 Sofia Papagiannaki
        # for each content part assert headers
279 95b36144 Sofia Papagiannaki
        i = 0
280 95b36144 Sofia Papagiannaki
        for cpart in cparts:
281 95b36144 Sofia Papagiannaki
            content = cpart.split('\r\n')
282 95b36144 Sofia Papagiannaki
            headers = content[1:3]
283 95b36144 Sofia Papagiannaki
            content_range = headers[0].split(': ')
284 95b36144 Sofia Papagiannaki
            self.assertEqual(content_range[0], 'Content-Range')
285 95b36144 Sofia Papagiannaki
286 95b36144 Sofia Papagiannaki
            r = l[i].split('-')
287 95b36144 Sofia Papagiannaki
            if not r[0] and not r[1]:
288 95b36144 Sofia Papagiannaki
                pass
289 95b36144 Sofia Papagiannaki
            elif not r[0]:
290 95b36144 Sofia Papagiannaki
                start = len(self.odata) - int(r[1])
291 95b36144 Sofia Papagiannaki
                end = len(self.odata)
292 95b36144 Sofia Papagiannaki
            elif not r[1]:
293 95b36144 Sofia Papagiannaki
                start = int(r[0])
294 95b36144 Sofia Papagiannaki
                end = len(self.odata)
295 95b36144 Sofia Papagiannaki
            else:
296 95b36144 Sofia Papagiannaki
                start = int(r[0])
297 95b36144 Sofia Papagiannaki
                end = int(r[1]) + 1
298 95b36144 Sofia Papagiannaki
            fdata = self.odata[start:end]
299 95b36144 Sofia Papagiannaki
            sdata = '\r\n'.join(content[4:-1])
300 95b36144 Sofia Papagiannaki
            self.assertEqual(len(fdata), len(sdata))
301 95b36144 Sofia Papagiannaki
            self.assertEquals(fdata, sdata)
302 95b36144 Sofia Papagiannaki
            i += 1
303 95b36144 Sofia Papagiannaki
304 95b36144 Sofia Papagiannaki
    def test_multiple_range_not_satisfiable(self):
305 95b36144 Sofia Papagiannaki
        # perform get with multiple range
306 95b36144 Sofia Papagiannaki
        out_of_range = len(self.odata) + 1
307 95b36144 Sofia Papagiannaki
        l = ['0-499', '-500', '%d-' % out_of_range]
308 95b36144 Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
309 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_RANGE=ranges)
310 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
311 95b36144 Sofia Papagiannaki
312 95b36144 Sofia Papagiannaki
    def test_get_if_match(self):
313 95b36144 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
314 95b36144 Sofia Papagiannaki
            etag = md5_hash(self.odata)
315 95b36144 Sofia Papagiannaki
        else:
316 95b36144 Sofia Papagiannaki
            etag = merkle(self.odata)
317 95b36144 Sofia Papagiannaki
318 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MATCH=etag)
319 95b36144 Sofia Papagiannaki
320 95b36144 Sofia Papagiannaki
        # assert get success
321 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
322 95b36144 Sofia Papagiannaki
323 95b36144 Sofia Papagiannaki
        # assert response content
324 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
325 95b36144 Sofia Papagiannaki
326 95b36144 Sofia Papagiannaki
    def test_get_if_match_star(self):
327 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MATCH='*')
328 95b36144 Sofia Papagiannaki
329 95b36144 Sofia Papagiannaki
        # assert get success
330 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
331 95b36144 Sofia Papagiannaki
332 95b36144 Sofia Papagiannaki
        # assert response content
333 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
334 95b36144 Sofia Papagiannaki
335 95b36144 Sofia Papagiannaki
    def test_get_multiple_if_match(self):
336 95b36144 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
337 95b36144 Sofia Papagiannaki
            etag = md5_hash(self.odata)
338 95b36144 Sofia Papagiannaki
        else:
339 95b36144 Sofia Papagiannaki
            etag = merkle(self.odata)
340 95b36144 Sofia Papagiannaki
341 95b36144 Sofia Papagiannaki
        quoted = lambda s: '"%s"' % s
342 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MATCH=','.join(
343 95b36144 Sofia Papagiannaki
            [quoted(etag), quoted(get_random_data(64))]))
344 95b36144 Sofia Papagiannaki
345 95b36144 Sofia Papagiannaki
        # assert get success
346 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
347 95b36144 Sofia Papagiannaki
348 95b36144 Sofia Papagiannaki
        # assert response content
349 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
350 95b36144 Sofia Papagiannaki
351 95b36144 Sofia Papagiannaki
    def test_if_match_precondition_failed(self):
352 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MATCH=get_random_name())
353 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 412)
354 95b36144 Sofia Papagiannaki
355 95b36144 Sofia Papagiannaki
    def test_if_none_match(self):
356 95b36144 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
357 95b36144 Sofia Papagiannaki
            etag = md5_hash(self.odata)
358 95b36144 Sofia Papagiannaki
        else:
359 95b36144 Sofia Papagiannaki
            etag = merkle(self.odata)
360 95b36144 Sofia Papagiannaki
361 95b36144 Sofia Papagiannaki
        # perform get with If-None-Match
362 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
363 95b36144 Sofia Papagiannaki
364 95b36144 Sofia Papagiannaki
        # assert precondition_failed
365 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
366 95b36144 Sofia Papagiannaki
367 95b36144 Sofia Papagiannaki
        # update object data
368 95b36144 Sofia Papagiannaki
        r = self.append_object_data(self.cname, self.oname)[-1]
369 95b36144 Sofia Papagiannaki
        self.assertTrue(etag != r['ETag'])
370 95b36144 Sofia Papagiannaki
371 95b36144 Sofia Papagiannaki
        # perform get with If-None-Match
372 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
373 95b36144 Sofia Papagiannaki
374 95b36144 Sofia Papagiannaki
        # assert get success
375 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
376 95b36144 Sofia Papagiannaki
377 95b36144 Sofia Papagiannaki
    def test_if_none_match_star(self):
378 95b36144 Sofia Papagiannaki
        # perform get with If-None-Match with star
379 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH='*')
380 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
381 95b36144 Sofia Papagiannaki
382 95b36144 Sofia Papagiannaki
    def test_if_modified_since(self):
383 95b36144 Sofia Papagiannaki
        # upload object
384 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
385 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
386 95b36144 Sofia Papagiannaki
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
387 95b36144 Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
388 95b36144 Sofia Papagiannaki
389 95b36144 Sofia Papagiannaki
        # Check not modified since
390 95b36144 Sofia Papagiannaki
        for t in t1_formats:
391 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
392 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 304)
393 95b36144 Sofia Papagiannaki
394 95b36144 Sofia Papagiannaki
        _time.sleep(1)
395 95b36144 Sofia Papagiannaki
396 95b36144 Sofia Papagiannaki
        # update object data
397 95b36144 Sofia Papagiannaki
        appended_data = self.append_object_data(self.cname, self.oname)[1]
398 95b36144 Sofia Papagiannaki
399 95b36144 Sofia Papagiannaki
        # Check modified since
400 95b36144 Sofia Papagiannaki
        for t in t1_formats:
401 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
402 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
403 95b36144 Sofia Papagiannaki
            self.assertEqual(r.content, self.odata + appended_data)
404 95b36144 Sofia Papagiannaki
405 95b36144 Sofia Papagiannaki
    def test_if_modified_since_invalid_date(self):
406 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
407 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
408 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
409 95b36144 Sofia Papagiannaki
410 95b36144 Sofia Papagiannaki
    def test_if_not_modified_since(self):
411 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
412 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
413 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
414 95b36144 Sofia Papagiannaki
415 95b36144 Sofia Papagiannaki
        # Check unmodified
416 95b36144 Sofia Papagiannaki
        t1 = t + datetime.timedelta(seconds=1)
417 95b36144 Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
418 95b36144 Sofia Papagiannaki
        for t in t1_formats:
419 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
420 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
421 95b36144 Sofia Papagiannaki
            self.assertEqual(r.content, self.odata)
422 95b36144 Sofia Papagiannaki
423 95b36144 Sofia Papagiannaki
        # modify object
424 95b36144 Sofia Papagiannaki
        _time.sleep(2)
425 95b36144 Sofia Papagiannaki
        self.append_object_data(self.cname, self.oname)
426 95b36144 Sofia Papagiannaki
427 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
428 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
429 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
430 95b36144 Sofia Papagiannaki
        t2 = t - datetime.timedelta(seconds=1)
431 95b36144 Sofia Papagiannaki
        t2_formats = map(t2.strftime, DATE_FORMATS)
432 95b36144 Sofia Papagiannaki
433 95b36144 Sofia Papagiannaki
        # check modified
434 95b36144 Sofia Papagiannaki
        for t in t2_formats:
435 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
436 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
437 95b36144 Sofia Papagiannaki
438 95b36144 Sofia Papagiannaki
        # modify account: update object meta
439 95b36144 Sofia Papagiannaki
        _time.sleep(1)
440 95b36144 Sofia Papagiannaki
        self.update_object_meta(self.cname, self.oname, {'foo': 'bar'})
441 95b36144 Sofia Papagiannaki
442 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
443 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
444 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
445 95b36144 Sofia Papagiannaki
        t3 = t - datetime.timedelta(seconds=1)
446 95b36144 Sofia Papagiannaki
        t3_formats = map(t3.strftime, DATE_FORMATS)
447 95b36144 Sofia Papagiannaki
448 95b36144 Sofia Papagiannaki
        # check modified
449 95b36144 Sofia Papagiannaki
        for t in t3_formats:
450 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
451 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
452 95b36144 Sofia Papagiannaki
453 95b36144 Sofia Papagiannaki
    def test_if_unmodified_since(self):
454 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
455 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
456 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
457 95b36144 Sofia Papagiannaki
        t = t + datetime.timedelta(seconds=1)
458 95b36144 Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
459 95b36144 Sofia Papagiannaki
460 95b36144 Sofia Papagiannaki
        for tf in t_formats:
461 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
462 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
463 95b36144 Sofia Papagiannaki
            self.assertEqual(r.content, self.odata)
464 95b36144 Sofia Papagiannaki
465 95b36144 Sofia Papagiannaki
    def test_if_unmodified_since_precondition_failed(self):
466 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
467 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
468 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
469 95b36144 Sofia Papagiannaki
        t = t - datetime.timedelta(seconds=1)
470 95b36144 Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
471 95b36144 Sofia Papagiannaki
472 95b36144 Sofia Papagiannaki
        for tf in t_formats:
473 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
474 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
475 95b36144 Sofia Papagiannaki
476 95b36144 Sofia Papagiannaki
    def test_hashes(self):
477 95b36144 Sofia Papagiannaki
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
478 95b36144 Sofia Papagiannaki
        oname, odata = self.upload_object(self.cname, length=l)[:-1]
479 95b36144 Sofia Papagiannaki
        size = len(odata)
480 95b36144 Sofia Papagiannaki
481 95b36144 Sofia Papagiannaki
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
482 13bf6cd8 Sofia Papagiannaki
        r = self.view('%s?format=json&hashmap' % view_url)
483 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
484 95b36144 Sofia Papagiannaki
        body = json.loads(r.content)
485 95b36144 Sofia Papagiannaki
486 95b36144 Sofia Papagiannaki
        hashes = body['hashes']
487 95b36144 Sofia Papagiannaki
        block_size = body['block_size']
488 95b36144 Sofia Papagiannaki
        block_num = size / block_size if size / block_size == 0 else\
489 95b36144 Sofia Papagiannaki
            size / block_size + 1
490 95b36144 Sofia Papagiannaki
        self.assertTrue(len(hashes), block_num)
491 95b36144 Sofia Papagiannaki
        i = 0
492 95b36144 Sofia Papagiannaki
        for h in hashes:
493 95b36144 Sofia Papagiannaki
            start = i * block_size
494 95b36144 Sofia Papagiannaki
            end = (i + 1) * block_size
495 95b36144 Sofia Papagiannaki
            hash = merkle(odata[start:end])
496 95b36144 Sofia Papagiannaki
            self.assertEqual(h, hash)
497 95b36144 Sofia Papagiannaki
            i += 1