Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / views.py @ 7e402b46

History | View | Annotate | Download (18.8 kB)

1 95b36144 Sofia Papagiannaki
#!/usr/bin/env python
2 95b36144 Sofia Papagiannaki
#coding=utf8
3 95b36144 Sofia Papagiannaki
4 95b36144 Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 95b36144 Sofia Papagiannaki
#
6 95b36144 Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 95b36144 Sofia Papagiannaki
# without modification, are permitted provided that the following
8 95b36144 Sofia Papagiannaki
# conditions are met:
9 95b36144 Sofia Papagiannaki
#
10 95b36144 Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 95b36144 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 95b36144 Sofia Papagiannaki
#      disclaimer.
13 95b36144 Sofia Papagiannaki
#
14 95b36144 Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 95b36144 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 95b36144 Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 95b36144 Sofia Papagiannaki
#      provided with the distribution.
18 95b36144 Sofia Papagiannaki
#
19 95b36144 Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 95b36144 Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 95b36144 Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 95b36144 Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 95b36144 Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 95b36144 Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 95b36144 Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 95b36144 Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 95b36144 Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 95b36144 Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 95b36144 Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 95b36144 Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 95b36144 Sofia Papagiannaki
#
32 95b36144 Sofia Papagiannaki
# The views and conclusions contained in the software and
33 95b36144 Sofia Papagiannaki
# documentation are those of the authors and should not be
34 95b36144 Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 95b36144 Sofia Papagiannaki
# or implied, of GRNET S.A.
36 95b36144 Sofia Papagiannaki
37 95b36144 Sofia Papagiannaki
from pithos.api import settings as pithos_settings
38 95b36144 Sofia Papagiannaki
from pithos.api.test import PithosAPITest, DATE_FORMATS
39 95b36144 Sofia Papagiannaki
from pithos.api.test.util import (md5_hash, get_random_data, get_random_name)
40 95b36144 Sofia Papagiannaki
from pithos.api.test.objects import merkle
41 95b36144 Sofia Papagiannaki
42 95b36144 Sofia Papagiannaki
from synnefo.lib.services import get_service_path
43 95b36144 Sofia Papagiannaki
from synnefo.lib import join_urls
44 95b36144 Sofia Papagiannaki
45 13bf6cd8 Sofia Papagiannaki
#from mock import patch
46 95b36144 Sofia Papagiannaki
from urllib import quote
47 95b36144 Sofia Papagiannaki
48 95b36144 Sofia Papagiannaki
import django.utils.simplejson as json
49 95b36144 Sofia Papagiannaki
50 95b36144 Sofia Papagiannaki
import re
51 95b36144 Sofia Papagiannaki
import datetime
52 95b36144 Sofia Papagiannaki
import time as _time
53 95b36144 Sofia Papagiannaki
import random
54 13bf6cd8 Sofia Papagiannaki
import urllib
55 13bf6cd8 Sofia Papagiannaki
import urlparse
56 95b36144 Sofia Papagiannaki
57 95b36144 Sofia Papagiannaki
58 13bf6cd8 Sofia Papagiannaki
def add_url_params(url, **kwargs):
59 13bf6cd8 Sofia Papagiannaki
    if not kwargs:
60 13bf6cd8 Sofia Papagiannaki
        return url
61 13bf6cd8 Sofia Papagiannaki
    parts = list(urlparse.urlsplit(url))
62 13bf6cd8 Sofia Papagiannaki
    params = dict(urlparse.parse_qsl(parts[3], keep_blank_values=True))
63 13bf6cd8 Sofia Papagiannaki
    params.update(kwargs)
64 13bf6cd8 Sofia Papagiannaki
    parts[3] = urllib.urlencode(params)
65 13bf6cd8 Sofia Papagiannaki
    return urlparse.urlunsplit(parts)
66 13bf6cd8 Sofia Papagiannaki
67 3dce76b5 Sofia Papagiannaki
68 13bf6cd8 Sofia Papagiannaki
class NotAllowedView(PithosAPITest):
69 3dce76b5 Sofia Papagiannaki
    def test_not_allowed(self):
70 3dce76b5 Sofia Papagiannaki
        self.view_path = join_urls(get_service_path(
71 3dce76b5 Sofia Papagiannaki
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
72 3dce76b5 Sofia Papagiannaki
        self.view_url = join_urls(self.view_path, self.user, get_random_name(),
73 3dce76b5 Sofia Papagiannaki
                                  get_random_name())
74 3dce76b5 Sofia Papagiannaki
75 3dce76b5 Sofia Papagiannaki
        r = self.delete(self.view_url)
76 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
77 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
78 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
79 3dce76b5 Sofia Papagiannaki
80 3dce76b5 Sofia Papagiannaki
        r = self.post(self.view_url)
81 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
82 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
83 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
84 3dce76b5 Sofia Papagiannaki
85 3dce76b5 Sofia Papagiannaki
        r = self.put(self.view_url)
86 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
87 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
88 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
89 3dce76b5 Sofia Papagiannaki
90 3dce76b5 Sofia Papagiannaki
        r = self.copy(self.view_url)
91 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
92 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
93 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
94 3dce76b5 Sofia Papagiannaki
95 3dce76b5 Sofia Papagiannaki
        r = self.move(self.view_url)
96 2aba7764 Sofia Papagiannaki
        self.assertEqual(r.status_code, 405)
97 2aba7764 Sofia Papagiannaki
        self.assertTrue('Allow' in r)
98 7baa6079 Sofia Papagiannaki
        self.assertEqual(sorted(r['Allow'].split(', ')),  ['GET', 'HEAD'])
99 3dce76b5 Sofia Papagiannaki
100 3dce76b5 Sofia Papagiannaki
101 95b36144 Sofia Papagiannaki
class ObjectGetView(PithosAPITest):
102 95b36144 Sofia Papagiannaki
    def setUp(self):
103 95b36144 Sofia Papagiannaki
        PithosAPITest.setUp(self)
104 95b36144 Sofia Papagiannaki
        self.cname = self.create_container()[0]
105 77b8a8e3 Sofia Papagiannaki
        self.oname, self.odata = self.upload_object(self.cname,
106 77b8a8e3 Sofia Papagiannaki
                                                    'φωτογραφία.JPG')[:-1]
107 95b36144 Sofia Papagiannaki
108 95b36144 Sofia Papagiannaki
        self.view_path = join_urls(get_service_path(
109 95b36144 Sofia Papagiannaki
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
110 95b36144 Sofia Papagiannaki
        self.view_url = join_urls(self.view_path, self.user, self.cname,
111 95b36144 Sofia Papagiannaki
                                  self.oname)
112 95b36144 Sofia Papagiannaki
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
113 95b36144 Sofia Papagiannaki
                                 self.oname)
114 95b36144 Sofia Papagiannaki
115 13bf6cd8 Sofia Papagiannaki
    def view(self, url, access_token='valid_token', user='user', *args,
116 13bf6cd8 Sofia Papagiannaki
             **kwargs):
117 13bf6cd8 Sofia Papagiannaki
118 13bf6cd8 Sofia Papagiannaki
        params = {}
119 13bf6cd8 Sofia Papagiannaki
        if access_token is not None:
120 13bf6cd8 Sofia Papagiannaki
            params['access_token'] = access_token
121 13bf6cd8 Sofia Papagiannaki
        return self.get(add_url_params(url, **params), user, *args,
122 13bf6cd8 Sofia Papagiannaki
                        **kwargs)
123 95b36144 Sofia Papagiannaki
124 13bf6cd8 Sofia Papagiannaki
    def test_no_authorization_granted(self):
125 13bf6cd8 Sofia Papagiannaki
        r = self.get(self.view_url)
126 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 302)
127 95b36144 Sofia Papagiannaki
        self.assertTrue('Location' in r)
128 13bf6cd8 Sofia Papagiannaki
        p = urlparse.urlparse(r['Location'])
129 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(p.netloc, 'testserver')
130 f4f948c0 Sofia Papagiannaki
        self.assertEqual(p.path, '/astakos/oauth2/auth')
131 13bf6cd8 Sofia Papagiannaki
132 13bf6cd8 Sofia Papagiannaki
        r = self.get(add_url_params(self.view_url, code='valid_code'),
133 13bf6cd8 Sofia Papagiannaki
                     follow=True)
134 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
135 13bf6cd8 Sofia Papagiannaki
        self.assertTrue(r.content, self.odata)
136 13bf6cd8 Sofia Papagiannaki
        intermidiate_url = r.redirect_chain[0][0]
137 13bf6cd8 Sofia Papagiannaki
        p = urlparse.urlparse(intermidiate_url)
138 13bf6cd8 Sofia Papagiannaki
        params = urlparse.parse_qs(p.query)
139 13bf6cd8 Sofia Papagiannaki
        self.assertTrue('access_token' in params)
140 13bf6cd8 Sofia Papagiannaki
141 13bf6cd8 Sofia Papagiannaki
        r = self.get(add_url_params(self.view_url, access_token='valid_token'))
142 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
143 13bf6cd8 Sofia Papagiannaki
        self.assertTrue(r.content, self.odata)
144 13bf6cd8 Sofia Papagiannaki
145 77b8a8e3 Sofia Papagiannaki
        r = self.get('%s&disposition-type=inline' %
146 77b8a8e3 Sofia Papagiannaki
                     add_url_params(self.view_url, access_token='valid_token'))
147 77b8a8e3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
148 77b8a8e3 Sofia Papagiannaki
        self.assertTrue(r.content, self.odata)
149 77b8a8e3 Sofia Papagiannaki
        self.assertTrue('Content-Disposition' in r)
150 77b8a8e3 Sofia Papagiannaki
        self.assertTrue('inline' in r['Content-Disposition'])
151 77b8a8e3 Sofia Papagiannaki
152 77b8a8e3 Sofia Papagiannaki
        r = self.get('%s&disposition-type=attachment' %
153 77b8a8e3 Sofia Papagiannaki
                     add_url_params(self.view_url, access_token='valid_token'))
154 77b8a8e3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
155 77b8a8e3 Sofia Papagiannaki
        self.assertTrue(r.content, self.odata)
156 77b8a8e3 Sofia Papagiannaki
        self.assertTrue('Content-Disposition' in r)
157 77b8a8e3 Sofia Papagiannaki
        self.assertTrue('attachment' in r['Content-Disposition'])
158 77b8a8e3 Sofia Papagiannaki
159 13bf6cd8 Sofia Papagiannaki
    def test_forbidden(self):
160 13bf6cd8 Sofia Papagiannaki
        container = self.create_container(user='alice')[0]
161 13bf6cd8 Sofia Papagiannaki
        obj = self.upload_object(container, user='alice')[0]
162 13bf6cd8 Sofia Papagiannaki
163 13bf6cd8 Sofia Papagiannaki
        url = join_urls(self.view_path, 'alice', container, obj)
164 13bf6cd8 Sofia Papagiannaki
        r = self.view(url)
165 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
166 13bf6cd8 Sofia Papagiannaki
167 13bf6cd8 Sofia Papagiannaki
    def test_shared_with_me(self):
168 13bf6cd8 Sofia Papagiannaki
        container = self.create_container(user='alice')[0]
169 13bf6cd8 Sofia Papagiannaki
        obj, data = self.upload_object(container, user='alice')[:-1]
170 13bf6cd8 Sofia Papagiannaki
171 13bf6cd8 Sofia Papagiannaki
        # share object
172 13bf6cd8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', container, obj)
173 13bf6cd8 Sofia Papagiannaki
        self.post(url, user='alice', content_type='',
174 13bf6cd8 Sofia Papagiannaki
                  HTTP_CONTENT_RANGE='bytes */*',
175 13bf6cd8 Sofia Papagiannaki
                  HTTP_X_OBJECT_SHARING='read=user')
176 13bf6cd8 Sofia Papagiannaki
177 13bf6cd8 Sofia Papagiannaki
        url = join_urls(self.view_path, 'alice', container, obj)
178 13bf6cd8 Sofia Papagiannaki
        r = self.view(url)
179 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
180 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.content, data)
181 13bf6cd8 Sofia Papagiannaki
182 13bf6cd8 Sofia Papagiannaki
    def test_view(self):
183 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url)
184 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
185 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
186 13bf6cd8 Sofia Papagiannaki
187 13bf6cd8 Sofia Papagiannaki
    def test_not_existing(self):
188 13bf6cd8 Sofia Papagiannaki
        url = self.view_url[:-1]
189 13bf6cd8 Sofia Papagiannaki
        r = self.view(url)
190 13bf6cd8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
191 95b36144 Sofia Papagiannaki
192 95b36144 Sofia Papagiannaki
    def test_versions(self):
193 95b36144 Sofia Papagiannaki
        c = self.cname
194 95b36144 Sofia Papagiannaki
        o = self.oname
195 95b36144 Sofia Papagiannaki
196 95b36144 Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
197 95b36144 Sofia Papagiannaki
        r = self.post(self.api_url, content_type='', **meta)
198 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
199 95b36144 Sofia Papagiannaki
200 13bf6cd8 Sofia Papagiannaki
        r = self.view('%s?version=list&format=json' % self.view_url)
201 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
202 95b36144 Sofia Papagiannaki
        l1 = json.loads(r.content)['versions']
203 95b36144 Sofia Papagiannaki
        self.assertEqual(len(l1), 2)
204 95b36144 Sofia Papagiannaki
205 95b36144 Sofia Papagiannaki
        # update meta
206 95b36144 Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
207 95b36144 Sofia Papagiannaki
                'HTTP_X_OBJECT_META_STOCK': 'True'}
208 95b36144 Sofia Papagiannaki
        r = self.post(self.api_url, content_type='', **meta)
209 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
210 95b36144 Sofia Papagiannaki
211 95b36144 Sofia Papagiannaki
        # assert a newly created version has been created
212 13bf6cd8 Sofia Papagiannaki
        r = self.view('%s?version=list&format=json' % self.view_url)
213 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
214 95b36144 Sofia Papagiannaki
        l2 = json.loads(r.content)['versions']
215 95b36144 Sofia Papagiannaki
        self.assertEqual(len(l2), len(l1) + 1)
216 95b36144 Sofia Papagiannaki
        self.assertEqual(l2[:-1], l1)
217 95b36144 Sofia Papagiannaki
218 95b36144 Sofia Papagiannaki
        vserial, _ = l2[-2]
219 95b36144 Sofia Papagiannaki
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
220 6ee6677e Sofia Papagiannaki
                         {'Quality': 'AAA'})
221 95b36144 Sofia Papagiannaki
222 95b36144 Sofia Papagiannaki
        # update data
223 95b36144 Sofia Papagiannaki
        self.append_object_data(c, o)
224 95b36144 Sofia Papagiannaki
225 95b36144 Sofia Papagiannaki
        # assert a newly created version has been created
226 13bf6cd8 Sofia Papagiannaki
        r = self.view('%s?version=list&format=json' % self.view_url)
227 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
228 95b36144 Sofia Papagiannaki
        l3 = json.loads(r.content)['versions']
229 95b36144 Sofia Papagiannaki
        self.assertEqual(len(l3), len(l2) + 1)
230 95b36144 Sofia Papagiannaki
        self.assertEqual(l3[:-1], l2)
231 95b36144 Sofia Papagiannaki
232 95b36144 Sofia Papagiannaki
    def test_objects_with_trailing_spaces(self):
233 95b36144 Sofia Papagiannaki
        cname = self.cname
234 95b36144 Sofia Papagiannaki
235 13bf6cd8 Sofia Papagiannaki
        r = self.view(quote('%s ' % self.view_url))
236 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
237 95b36144 Sofia Papagiannaki
238 95b36144 Sofia Papagiannaki
        # delete object
239 95b36144 Sofia Papagiannaki
        self.delete(self.api_url)
240 95b36144 Sofia Papagiannaki
241 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url)
242 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
243 95b36144 Sofia Papagiannaki
244 95b36144 Sofia Papagiannaki
        # upload object with trailing space
245 95b36144 Sofia Papagiannaki
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
246 95b36144 Sofia Papagiannaki
247 95b36144 Sofia Papagiannaki
        view_url = join_urls(self.view_path, self.user, cname, oname)
248 13bf6cd8 Sofia Papagiannaki
        r = self.view(view_url)
249 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
250 95b36144 Sofia Papagiannaki
251 95b36144 Sofia Papagiannaki
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
252 13bf6cd8 Sofia Papagiannaki
        r = self.view(view_url)
253 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
254 95b36144 Sofia Papagiannaki
255 95b36144 Sofia Papagiannaki
    def test_get_partial(self):
256 95b36144 Sofia Papagiannaki
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
257 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
258 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
259 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata[:limit + 1])
260 95b36144 Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
261 95b36144 Sofia Papagiannaki
        self.assertEqual(r['Content-Range'], 'bytes 0-%d/%d' % (
262 95b36144 Sofia Papagiannaki
            limit, len(self.odata)))
263 95b36144 Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
264 95b36144 Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
265 95b36144 Sofia Papagiannaki
266 95b36144 Sofia Papagiannaki
    def test_get_range_not_satisfiable(self):
267 95b36144 Sofia Papagiannaki
        # TODO
268 13bf6cd8 Sofia Papagiannaki
        #r = self.view(self.view_url, HTTP_RANGE='bytes=50-10')
269 95b36144 Sofia Papagiannaki
        #self.assertEqual(r.status_code, 416)
270 95b36144 Sofia Papagiannaki
271 95b36144 Sofia Papagiannaki
        offset = len(self.odata) + 1
272 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
273 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
274 95b36144 Sofia Papagiannaki
275 95b36144 Sofia Papagiannaki
    def test_multiple_range(self):
276 95b36144 Sofia Papagiannaki
        l = ['0-499', '-500', '1000-']
277 95b36144 Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
278 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_RANGE=ranges)
279 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
280 95b36144 Sofia Papagiannaki
        self.assertTrue('content-type' in r)
281 95b36144 Sofia Papagiannaki
        p = re.compile(
282 95b36144 Sofia Papagiannaki
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
283 95b36144 Sofia Papagiannaki
            re.I)
284 95b36144 Sofia Papagiannaki
        m = p.match(r['content-type'])
285 95b36144 Sofia Papagiannaki
        if m is None:
286 95b36144 Sofia Papagiannaki
            self.fail('Invalid multiple range content type')
287 95b36144 Sofia Papagiannaki
        boundary = m.groupdict()['boundary']
288 95b36144 Sofia Papagiannaki
        cparts = r.content.split('--%s' % boundary)[1:-1]
289 95b36144 Sofia Papagiannaki
290 95b36144 Sofia Papagiannaki
        # assert content parts length
291 95b36144 Sofia Papagiannaki
        self.assertEqual(len(cparts), len(l))
292 95b36144 Sofia Papagiannaki
293 95b36144 Sofia Papagiannaki
        # for each content part assert headers
294 95b36144 Sofia Papagiannaki
        i = 0
295 95b36144 Sofia Papagiannaki
        for cpart in cparts:
296 95b36144 Sofia Papagiannaki
            content = cpart.split('\r\n')
297 95b36144 Sofia Papagiannaki
            headers = content[1:3]
298 95b36144 Sofia Papagiannaki
            content_range = headers[0].split(': ')
299 95b36144 Sofia Papagiannaki
            self.assertEqual(content_range[0], 'Content-Range')
300 95b36144 Sofia Papagiannaki
301 95b36144 Sofia Papagiannaki
            r = l[i].split('-')
302 95b36144 Sofia Papagiannaki
            if not r[0] and not r[1]:
303 95b36144 Sofia Papagiannaki
                pass
304 95b36144 Sofia Papagiannaki
            elif not r[0]:
305 95b36144 Sofia Papagiannaki
                start = len(self.odata) - int(r[1])
306 95b36144 Sofia Papagiannaki
                end = len(self.odata)
307 95b36144 Sofia Papagiannaki
            elif not r[1]:
308 95b36144 Sofia Papagiannaki
                start = int(r[0])
309 95b36144 Sofia Papagiannaki
                end = len(self.odata)
310 95b36144 Sofia Papagiannaki
            else:
311 95b36144 Sofia Papagiannaki
                start = int(r[0])
312 95b36144 Sofia Papagiannaki
                end = int(r[1]) + 1
313 95b36144 Sofia Papagiannaki
            fdata = self.odata[start:end]
314 95b36144 Sofia Papagiannaki
            sdata = '\r\n'.join(content[4:-1])
315 95b36144 Sofia Papagiannaki
            self.assertEqual(len(fdata), len(sdata))
316 95b36144 Sofia Papagiannaki
            self.assertEquals(fdata, sdata)
317 95b36144 Sofia Papagiannaki
            i += 1
318 95b36144 Sofia Papagiannaki
319 95b36144 Sofia Papagiannaki
    def test_multiple_range_not_satisfiable(self):
320 95b36144 Sofia Papagiannaki
        # perform get with multiple range
321 95b36144 Sofia Papagiannaki
        out_of_range = len(self.odata) + 1
322 95b36144 Sofia Papagiannaki
        l = ['0-499', '-500', '%d-' % out_of_range]
323 95b36144 Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
324 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_RANGE=ranges)
325 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
326 95b36144 Sofia Papagiannaki
327 95b36144 Sofia Papagiannaki
    def test_get_if_match(self):
328 95b36144 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
329 95b36144 Sofia Papagiannaki
            etag = md5_hash(self.odata)
330 95b36144 Sofia Papagiannaki
        else:
331 95b36144 Sofia Papagiannaki
            etag = merkle(self.odata)
332 95b36144 Sofia Papagiannaki
333 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MATCH=etag)
334 95b36144 Sofia Papagiannaki
335 95b36144 Sofia Papagiannaki
        # assert get success
336 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
337 95b36144 Sofia Papagiannaki
338 95b36144 Sofia Papagiannaki
        # assert response content
339 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
340 95b36144 Sofia Papagiannaki
341 95b36144 Sofia Papagiannaki
    def test_get_if_match_star(self):
342 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MATCH='*')
343 95b36144 Sofia Papagiannaki
344 95b36144 Sofia Papagiannaki
        # assert get success
345 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
346 95b36144 Sofia Papagiannaki
347 95b36144 Sofia Papagiannaki
        # assert response content
348 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
349 95b36144 Sofia Papagiannaki
350 95b36144 Sofia Papagiannaki
    def test_get_multiple_if_match(self):
351 95b36144 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
352 95b36144 Sofia Papagiannaki
            etag = md5_hash(self.odata)
353 95b36144 Sofia Papagiannaki
        else:
354 95b36144 Sofia Papagiannaki
            etag = merkle(self.odata)
355 95b36144 Sofia Papagiannaki
356 95b36144 Sofia Papagiannaki
        quoted = lambda s: '"%s"' % s
357 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MATCH=','.join(
358 95b36144 Sofia Papagiannaki
            [quoted(etag), quoted(get_random_data(64))]))
359 95b36144 Sofia Papagiannaki
360 95b36144 Sofia Papagiannaki
        # assert get success
361 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
362 95b36144 Sofia Papagiannaki
363 95b36144 Sofia Papagiannaki
        # assert response content
364 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
365 95b36144 Sofia Papagiannaki
366 95b36144 Sofia Papagiannaki
    def test_if_match_precondition_failed(self):
367 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MATCH=get_random_name())
368 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 412)
369 95b36144 Sofia Papagiannaki
370 95b36144 Sofia Papagiannaki
    def test_if_none_match(self):
371 95b36144 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
372 95b36144 Sofia Papagiannaki
            etag = md5_hash(self.odata)
373 95b36144 Sofia Papagiannaki
        else:
374 95b36144 Sofia Papagiannaki
            etag = merkle(self.odata)
375 95b36144 Sofia Papagiannaki
376 95b36144 Sofia Papagiannaki
        # perform get with If-None-Match
377 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
378 95b36144 Sofia Papagiannaki
379 95b36144 Sofia Papagiannaki
        # assert precondition_failed
380 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
381 95b36144 Sofia Papagiannaki
382 95b36144 Sofia Papagiannaki
        # update object data
383 95b36144 Sofia Papagiannaki
        r = self.append_object_data(self.cname, self.oname)[-1]
384 95b36144 Sofia Papagiannaki
        self.assertTrue(etag != r['ETag'])
385 95b36144 Sofia Papagiannaki
386 95b36144 Sofia Papagiannaki
        # perform get with If-None-Match
387 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
388 95b36144 Sofia Papagiannaki
389 95b36144 Sofia Papagiannaki
        # assert get success
390 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
391 95b36144 Sofia Papagiannaki
392 95b36144 Sofia Papagiannaki
    def test_if_none_match_star(self):
393 95b36144 Sofia Papagiannaki
        # perform get with If-None-Match with star
394 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_NONE_MATCH='*')
395 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
396 95b36144 Sofia Papagiannaki
397 95b36144 Sofia Papagiannaki
    def test_if_modified_since(self):
398 95b36144 Sofia Papagiannaki
        # upload object
399 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
400 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
401 95b36144 Sofia Papagiannaki
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
402 95b36144 Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
403 95b36144 Sofia Papagiannaki
404 95b36144 Sofia Papagiannaki
        # Check not modified since
405 95b36144 Sofia Papagiannaki
        for t in t1_formats:
406 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
407 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 304)
408 95b36144 Sofia Papagiannaki
409 95b36144 Sofia Papagiannaki
        _time.sleep(1)
410 95b36144 Sofia Papagiannaki
411 95b36144 Sofia Papagiannaki
        # update object data
412 95b36144 Sofia Papagiannaki
        appended_data = self.append_object_data(self.cname, self.oname)[1]
413 95b36144 Sofia Papagiannaki
414 95b36144 Sofia Papagiannaki
        # Check modified since
415 95b36144 Sofia Papagiannaki
        for t in t1_formats:
416 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
417 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
418 95b36144 Sofia Papagiannaki
            self.assertEqual(r.content, self.odata + appended_data)
419 95b36144 Sofia Papagiannaki
420 95b36144 Sofia Papagiannaki
    def test_if_modified_since_invalid_date(self):
421 13bf6cd8 Sofia Papagiannaki
        r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
422 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
423 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
424 95b36144 Sofia Papagiannaki
425 95b36144 Sofia Papagiannaki
    def test_if_not_modified_since(self):
426 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
427 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
428 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
429 95b36144 Sofia Papagiannaki
430 95b36144 Sofia Papagiannaki
        # Check unmodified
431 95b36144 Sofia Papagiannaki
        t1 = t + datetime.timedelta(seconds=1)
432 95b36144 Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
433 95b36144 Sofia Papagiannaki
        for t in t1_formats:
434 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
435 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
436 95b36144 Sofia Papagiannaki
            self.assertEqual(r.content, self.odata)
437 95b36144 Sofia Papagiannaki
438 95b36144 Sofia Papagiannaki
        # modify object
439 95b36144 Sofia Papagiannaki
        _time.sleep(2)
440 95b36144 Sofia Papagiannaki
        self.append_object_data(self.cname, self.oname)
441 95b36144 Sofia Papagiannaki
442 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
443 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
444 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
445 95b36144 Sofia Papagiannaki
        t2 = t - datetime.timedelta(seconds=1)
446 95b36144 Sofia Papagiannaki
        t2_formats = map(t2.strftime, DATE_FORMATS)
447 95b36144 Sofia Papagiannaki
448 95b36144 Sofia Papagiannaki
        # check modified
449 95b36144 Sofia Papagiannaki
        for t in t2_formats:
450 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
451 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
452 95b36144 Sofia Papagiannaki
453 95b36144 Sofia Papagiannaki
        # modify account: update object meta
454 95b36144 Sofia Papagiannaki
        _time.sleep(1)
455 95b36144 Sofia Papagiannaki
        self.update_object_meta(self.cname, self.oname, {'foo': 'bar'})
456 95b36144 Sofia Papagiannaki
457 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
458 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
459 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
460 95b36144 Sofia Papagiannaki
        t3 = t - datetime.timedelta(seconds=1)
461 95b36144 Sofia Papagiannaki
        t3_formats = map(t3.strftime, DATE_FORMATS)
462 95b36144 Sofia Papagiannaki
463 95b36144 Sofia Papagiannaki
        # check modified
464 95b36144 Sofia Papagiannaki
        for t in t3_formats:
465 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
466 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
467 95b36144 Sofia Papagiannaki
468 95b36144 Sofia Papagiannaki
    def test_if_unmodified_since(self):
469 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
470 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
471 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
472 95b36144 Sofia Papagiannaki
        t = t + datetime.timedelta(seconds=1)
473 95b36144 Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
474 95b36144 Sofia Papagiannaki
475 95b36144 Sofia Papagiannaki
        for tf in t_formats:
476 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
477 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
478 95b36144 Sofia Papagiannaki
            self.assertEqual(r.content, self.odata)
479 95b36144 Sofia Papagiannaki
480 95b36144 Sofia Papagiannaki
    def test_if_unmodified_since_precondition_failed(self):
481 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
482 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
483 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
484 95b36144 Sofia Papagiannaki
        t = t - datetime.timedelta(seconds=1)
485 95b36144 Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
486 95b36144 Sofia Papagiannaki
487 95b36144 Sofia Papagiannaki
        for tf in t_formats:
488 13bf6cd8 Sofia Papagiannaki
            r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
489 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
490 95b36144 Sofia Papagiannaki
491 95b36144 Sofia Papagiannaki
    def test_hashes(self):
492 95b36144 Sofia Papagiannaki
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
493 95b36144 Sofia Papagiannaki
        oname, odata = self.upload_object(self.cname, length=l)[:-1]
494 95b36144 Sofia Papagiannaki
        size = len(odata)
495 95b36144 Sofia Papagiannaki
496 95b36144 Sofia Papagiannaki
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
497 13bf6cd8 Sofia Papagiannaki
        r = self.view('%s?format=json&hashmap' % view_url)
498 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
499 95b36144 Sofia Papagiannaki
        body = json.loads(r.content)
500 95b36144 Sofia Papagiannaki
501 95b36144 Sofia Papagiannaki
        hashes = body['hashes']
502 95b36144 Sofia Papagiannaki
        block_size = body['block_size']
503 95b36144 Sofia Papagiannaki
        block_num = size / block_size if size / block_size == 0 else\
504 95b36144 Sofia Papagiannaki
            size / block_size + 1
505 95b36144 Sofia Papagiannaki
        self.assertTrue(len(hashes), block_num)
506 95b36144 Sofia Papagiannaki
        i = 0
507 95b36144 Sofia Papagiannaki
        for h in hashes:
508 95b36144 Sofia Papagiannaki
            start = i * block_size
509 95b36144 Sofia Papagiannaki
            end = (i + 1) * block_size
510 95b36144 Sofia Papagiannaki
            hash = merkle(odata[start:end])
511 95b36144 Sofia Papagiannaki
            self.assertEqual(h, hash)
512 95b36144 Sofia Papagiannaki
            i += 1