Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / views.py @ 5ecaeaa6

History | View | Annotate | Download (14.8 kB)

1 95b36144 Sofia Papagiannaki
#!/usr/bin/env python
2 95b36144 Sofia Papagiannaki
#coding=utf8
3 95b36144 Sofia Papagiannaki
4 95b36144 Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 95b36144 Sofia Papagiannaki
#
6 95b36144 Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 95b36144 Sofia Papagiannaki
# without modification, are permitted provided that the following
8 95b36144 Sofia Papagiannaki
# conditions are met:
9 95b36144 Sofia Papagiannaki
#
10 95b36144 Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 95b36144 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 95b36144 Sofia Papagiannaki
#      disclaimer.
13 95b36144 Sofia Papagiannaki
#
14 95b36144 Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 95b36144 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 95b36144 Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 95b36144 Sofia Papagiannaki
#      provided with the distribution.
18 95b36144 Sofia Papagiannaki
#
19 95b36144 Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 95b36144 Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 95b36144 Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 95b36144 Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 95b36144 Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 95b36144 Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 95b36144 Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 95b36144 Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 95b36144 Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 95b36144 Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 95b36144 Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 95b36144 Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 95b36144 Sofia Papagiannaki
#
32 95b36144 Sofia Papagiannaki
# The views and conclusions contained in the software and
33 95b36144 Sofia Papagiannaki
# documentation are those of the authors and should not be
34 95b36144 Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 95b36144 Sofia Papagiannaki
# or implied, of GRNET S.A.
36 95b36144 Sofia Papagiannaki
37 95b36144 Sofia Papagiannaki
from pithos.api import settings as pithos_settings
38 95b36144 Sofia Papagiannaki
from pithos.api.test import PithosAPITest, DATE_FORMATS
39 95b36144 Sofia Papagiannaki
from pithos.api.test.util import (md5_hash, get_random_data, get_random_name)
40 95b36144 Sofia Papagiannaki
from pithos.api.test.objects import merkle
41 95b36144 Sofia Papagiannaki
42 95b36144 Sofia Papagiannaki
from synnefo.lib.services import get_service_path
43 95b36144 Sofia Papagiannaki
from synnefo.lib import join_urls
44 95b36144 Sofia Papagiannaki
45 95b36144 Sofia Papagiannaki
from mock import patch
46 95b36144 Sofia Papagiannaki
from urllib import quote
47 95b36144 Sofia Papagiannaki
from urlparse import urlsplit, parse_qs
48 95b36144 Sofia Papagiannaki
49 95b36144 Sofia Papagiannaki
import django.utils.simplejson as json
50 95b36144 Sofia Papagiannaki
51 95b36144 Sofia Papagiannaki
import re
52 95b36144 Sofia Papagiannaki
import datetime
53 95b36144 Sofia Papagiannaki
import time as _time
54 95b36144 Sofia Papagiannaki
import random
55 95b36144 Sofia Papagiannaki
56 95b36144 Sofia Papagiannaki
57 95b36144 Sofia Papagiannaki
class ObjectGetView(PithosAPITest):
58 95b36144 Sofia Papagiannaki
    def setUp(self):
59 95b36144 Sofia Papagiannaki
        PithosAPITest.setUp(self)
60 95b36144 Sofia Papagiannaki
        self.cname = self.create_container()[0]
61 95b36144 Sofia Papagiannaki
        self.oname, self.odata = self.upload_object(self.cname)[:-1]
62 95b36144 Sofia Papagiannaki
63 95b36144 Sofia Papagiannaki
        self.view_path = join_urls(get_service_path(
64 95b36144 Sofia Papagiannaki
            pithos_settings.pithos_services, 'pithos_ui'), 'view')
65 95b36144 Sofia Papagiannaki
        self.view_url = join_urls(self.view_path, self.user, self.cname,
66 95b36144 Sofia Papagiannaki
                                  self.oname)
67 95b36144 Sofia Papagiannaki
        self.api_url = join_urls(self.pithos_path, self.user, self.cname,
68 95b36144 Sofia Papagiannaki
                                 self.oname)
69 95b36144 Sofia Papagiannaki
70 95b36144 Sofia Papagiannaki
    def get(self, url, user='user', *args, **kwargs):
71 95b36144 Sofia Papagiannaki
        with patch("pithos.api.util.get_token_from_cookie") as m:
72 95b36144 Sofia Papagiannaki
            m.return_value = 'token'
73 95b36144 Sofia Papagiannaki
            return super(ObjectGetView, self).get(url, user='user', *args,
74 95b36144 Sofia Papagiannaki
                                                  **kwargs)
75 95b36144 Sofia Papagiannaki
76 95b36144 Sofia Papagiannaki
    def test_no_cookie_redirect(self):
77 95b36144 Sofia Papagiannaki
        r = super(ObjectGetView, self).get(self.view_url)
78 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 302)
79 95b36144 Sofia Papagiannaki
        self.assertTrue('Location' in r)
80 95b36144 Sofia Papagiannaki
        parts = list(urlsplit(r['Location']))
81 95b36144 Sofia Papagiannaki
        qs = parse_qs(parts[3])
82 95b36144 Sofia Papagiannaki
        self.assertTrue('next' in qs)
83 5ecaeaa6 Sofia Papagiannaki
        self.assertEqual(qs['next'][0], join_urls(pithos_settings.BASE_HOST,
84 5ecaeaa6 Sofia Papagiannaki
                                                  self.view_url))
85 95b36144 Sofia Papagiannaki
86 95b36144 Sofia Papagiannaki
    def test_versions(self):
87 95b36144 Sofia Papagiannaki
        c = self.cname
88 95b36144 Sofia Papagiannaki
        o = self.oname
89 95b36144 Sofia Papagiannaki
90 95b36144 Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
91 95b36144 Sofia Papagiannaki
        r = self.post(self.api_url, content_type='', **meta)
92 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
93 95b36144 Sofia Papagiannaki
94 95b36144 Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % self.view_url)
95 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
96 95b36144 Sofia Papagiannaki
        l1 = json.loads(r.content)['versions']
97 95b36144 Sofia Papagiannaki
        self.assertEqual(len(l1), 2)
98 95b36144 Sofia Papagiannaki
99 95b36144 Sofia Papagiannaki
        # update meta
100 95b36144 Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
101 95b36144 Sofia Papagiannaki
                'HTTP_X_OBJECT_META_STOCK': 'True'}
102 95b36144 Sofia Papagiannaki
        r = self.post(self.api_url, content_type='', **meta)
103 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
104 95b36144 Sofia Papagiannaki
105 95b36144 Sofia Papagiannaki
        # assert a newly created version has been created
106 95b36144 Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % self.view_url)
107 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
108 95b36144 Sofia Papagiannaki
        l2 = json.loads(r.content)['versions']
109 95b36144 Sofia Papagiannaki
        self.assertEqual(len(l2), len(l1) + 1)
110 95b36144 Sofia Papagiannaki
        self.assertEqual(l2[:-1], l1)
111 95b36144 Sofia Papagiannaki
112 95b36144 Sofia Papagiannaki
        vserial, _ = l2[-2]
113 95b36144 Sofia Papagiannaki
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
114 95b36144 Sofia Papagiannaki
                         {'X-Object-Meta-Quality': 'AAA'})
115 95b36144 Sofia Papagiannaki
116 95b36144 Sofia Papagiannaki
        # update data
117 95b36144 Sofia Papagiannaki
        self.append_object_data(c, o)
118 95b36144 Sofia Papagiannaki
119 95b36144 Sofia Papagiannaki
        # assert a newly created version has been created
120 95b36144 Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % self.view_url)
121 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
122 95b36144 Sofia Papagiannaki
        l3 = json.loads(r.content)['versions']
123 95b36144 Sofia Papagiannaki
        self.assertEqual(len(l3), len(l2) + 1)
124 95b36144 Sofia Papagiannaki
        self.assertEqual(l3[:-1], l2)
125 95b36144 Sofia Papagiannaki
126 95b36144 Sofia Papagiannaki
    def test_objects_with_trailing_spaces(self):
127 95b36144 Sofia Papagiannaki
        cname = self.cname
128 95b36144 Sofia Papagiannaki
129 95b36144 Sofia Papagiannaki
        r = self.get(quote('%s ' % self.view_url))
130 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
131 95b36144 Sofia Papagiannaki
132 95b36144 Sofia Papagiannaki
        # delete object
133 95b36144 Sofia Papagiannaki
        self.delete(self.api_url)
134 95b36144 Sofia Papagiannaki
135 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url)
136 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
137 95b36144 Sofia Papagiannaki
138 95b36144 Sofia Papagiannaki
        # upload object with trailing space
139 95b36144 Sofia Papagiannaki
        oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
140 95b36144 Sofia Papagiannaki
141 95b36144 Sofia Papagiannaki
        view_url = join_urls(self.view_path, self.user, cname, oname)
142 95b36144 Sofia Papagiannaki
        r = self.get(view_url)
143 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
144 95b36144 Sofia Papagiannaki
145 95b36144 Sofia Papagiannaki
        view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
146 95b36144 Sofia Papagiannaki
        r = self.get(view_url)
147 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
148 95b36144 Sofia Papagiannaki
149 95b36144 Sofia Papagiannaki
    def test_get_partial(self):
150 95b36144 Sofia Papagiannaki
        limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
151 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
152 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
153 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata[:limit + 1])
154 95b36144 Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
155 95b36144 Sofia Papagiannaki
        self.assertEqual(r['Content-Range'], 'bytes 0-%d/%d' % (
156 95b36144 Sofia Papagiannaki
            limit, len(self.odata)))
157 95b36144 Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
158 95b36144 Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
159 95b36144 Sofia Papagiannaki
160 95b36144 Sofia Papagiannaki
    def test_get_range_not_satisfiable(self):
161 95b36144 Sofia Papagiannaki
        # TODO
162 95b36144 Sofia Papagiannaki
        #r = self.get(self.view_url, HTTP_RANGE='bytes=50-10')
163 95b36144 Sofia Papagiannaki
        #self.assertEqual(r.status_code, 416)
164 95b36144 Sofia Papagiannaki
165 95b36144 Sofia Papagiannaki
        offset = len(self.odata) + 1
166 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
167 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
168 95b36144 Sofia Papagiannaki
169 95b36144 Sofia Papagiannaki
    def test_multiple_range(self):
170 95b36144 Sofia Papagiannaki
        l = ['0-499', '-500', '1000-']
171 95b36144 Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
172 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_RANGE=ranges)
173 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
174 95b36144 Sofia Papagiannaki
        self.assertTrue('content-type' in r)
175 95b36144 Sofia Papagiannaki
        p = re.compile(
176 95b36144 Sofia Papagiannaki
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
177 95b36144 Sofia Papagiannaki
            re.I)
178 95b36144 Sofia Papagiannaki
        m = p.match(r['content-type'])
179 95b36144 Sofia Papagiannaki
        if m is None:
180 95b36144 Sofia Papagiannaki
            self.fail('Invalid multiple range content type')
181 95b36144 Sofia Papagiannaki
        boundary = m.groupdict()['boundary']
182 95b36144 Sofia Papagiannaki
        cparts = r.content.split('--%s' % boundary)[1:-1]
183 95b36144 Sofia Papagiannaki
184 95b36144 Sofia Papagiannaki
        # assert content parts length
185 95b36144 Sofia Papagiannaki
        self.assertEqual(len(cparts), len(l))
186 95b36144 Sofia Papagiannaki
187 95b36144 Sofia Papagiannaki
        # for each content part assert headers
188 95b36144 Sofia Papagiannaki
        i = 0
189 95b36144 Sofia Papagiannaki
        for cpart in cparts:
190 95b36144 Sofia Papagiannaki
            content = cpart.split('\r\n')
191 95b36144 Sofia Papagiannaki
            headers = content[1:3]
192 95b36144 Sofia Papagiannaki
            content_range = headers[0].split(': ')
193 95b36144 Sofia Papagiannaki
            self.assertEqual(content_range[0], 'Content-Range')
194 95b36144 Sofia Papagiannaki
195 95b36144 Sofia Papagiannaki
            r = l[i].split('-')
196 95b36144 Sofia Papagiannaki
            if not r[0] and not r[1]:
197 95b36144 Sofia Papagiannaki
                pass
198 95b36144 Sofia Papagiannaki
            elif not r[0]:
199 95b36144 Sofia Papagiannaki
                start = len(self.odata) - int(r[1])
200 95b36144 Sofia Papagiannaki
                end = len(self.odata)
201 95b36144 Sofia Papagiannaki
            elif not r[1]:
202 95b36144 Sofia Papagiannaki
                start = int(r[0])
203 95b36144 Sofia Papagiannaki
                end = len(self.odata)
204 95b36144 Sofia Papagiannaki
            else:
205 95b36144 Sofia Papagiannaki
                start = int(r[0])
206 95b36144 Sofia Papagiannaki
                end = int(r[1]) + 1
207 95b36144 Sofia Papagiannaki
            fdata = self.odata[start:end]
208 95b36144 Sofia Papagiannaki
            sdata = '\r\n'.join(content[4:-1])
209 95b36144 Sofia Papagiannaki
            self.assertEqual(len(fdata), len(sdata))
210 95b36144 Sofia Papagiannaki
            self.assertEquals(fdata, sdata)
211 95b36144 Sofia Papagiannaki
            i += 1
212 95b36144 Sofia Papagiannaki
213 95b36144 Sofia Papagiannaki
    def test_multiple_range_not_satisfiable(self):
214 95b36144 Sofia Papagiannaki
        # perform get with multiple range
215 95b36144 Sofia Papagiannaki
        out_of_range = len(self.odata) + 1
216 95b36144 Sofia Papagiannaki
        l = ['0-499', '-500', '%d-' % out_of_range]
217 95b36144 Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
218 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_RANGE=ranges)
219 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
220 95b36144 Sofia Papagiannaki
221 95b36144 Sofia Papagiannaki
    def test_get_if_match(self):
222 95b36144 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
223 95b36144 Sofia Papagiannaki
            etag = md5_hash(self.odata)
224 95b36144 Sofia Papagiannaki
        else:
225 95b36144 Sofia Papagiannaki
            etag = merkle(self.odata)
226 95b36144 Sofia Papagiannaki
227 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_IF_MATCH=etag)
228 95b36144 Sofia Papagiannaki
229 95b36144 Sofia Papagiannaki
        # assert get success
230 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
231 95b36144 Sofia Papagiannaki
232 95b36144 Sofia Papagiannaki
        # assert response content
233 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
234 95b36144 Sofia Papagiannaki
235 95b36144 Sofia Papagiannaki
    def test_get_if_match_star(self):
236 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_IF_MATCH='*')
237 95b36144 Sofia Papagiannaki
238 95b36144 Sofia Papagiannaki
        # assert get success
239 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
240 95b36144 Sofia Papagiannaki
241 95b36144 Sofia Papagiannaki
        # assert response content
242 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
243 95b36144 Sofia Papagiannaki
244 95b36144 Sofia Papagiannaki
    def test_get_multiple_if_match(self):
245 95b36144 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
246 95b36144 Sofia Papagiannaki
            etag = md5_hash(self.odata)
247 95b36144 Sofia Papagiannaki
        else:
248 95b36144 Sofia Papagiannaki
            etag = merkle(self.odata)
249 95b36144 Sofia Papagiannaki
250 95b36144 Sofia Papagiannaki
        quoted = lambda s: '"%s"' % s
251 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_IF_MATCH=','.join(
252 95b36144 Sofia Papagiannaki
            [quoted(etag), quoted(get_random_data(64))]))
253 95b36144 Sofia Papagiannaki
254 95b36144 Sofia Papagiannaki
        # assert get success
255 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
256 95b36144 Sofia Papagiannaki
257 95b36144 Sofia Papagiannaki
        # assert response content
258 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
259 95b36144 Sofia Papagiannaki
260 95b36144 Sofia Papagiannaki
    def test_if_match_precondition_failed(self):
261 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_IF_MATCH=get_random_name())
262 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 412)
263 95b36144 Sofia Papagiannaki
264 95b36144 Sofia Papagiannaki
    def test_if_none_match(self):
265 95b36144 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
266 95b36144 Sofia Papagiannaki
            etag = md5_hash(self.odata)
267 95b36144 Sofia Papagiannaki
        else:
268 95b36144 Sofia Papagiannaki
            etag = merkle(self.odata)
269 95b36144 Sofia Papagiannaki
270 95b36144 Sofia Papagiannaki
        # perform get with If-None-Match
271 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
272 95b36144 Sofia Papagiannaki
273 95b36144 Sofia Papagiannaki
        # assert precondition_failed
274 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
275 95b36144 Sofia Papagiannaki
276 95b36144 Sofia Papagiannaki
        # update object data
277 95b36144 Sofia Papagiannaki
        r = self.append_object_data(self.cname, self.oname)[-1]
278 95b36144 Sofia Papagiannaki
        self.assertTrue(etag != r['ETag'])
279 95b36144 Sofia Papagiannaki
280 95b36144 Sofia Papagiannaki
        # perform get with If-None-Match
281 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
282 95b36144 Sofia Papagiannaki
283 95b36144 Sofia Papagiannaki
        # assert get success
284 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
285 95b36144 Sofia Papagiannaki
286 95b36144 Sofia Papagiannaki
    def test_if_none_match_star(self):
287 95b36144 Sofia Papagiannaki
        # perform get with If-None-Match with star
288 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_IF_NONE_MATCH='*')
289 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
290 95b36144 Sofia Papagiannaki
291 95b36144 Sofia Papagiannaki
    def test_if_modified_since(self):
292 95b36144 Sofia Papagiannaki
        # upload object
293 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
294 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
295 95b36144 Sofia Papagiannaki
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
296 95b36144 Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
297 95b36144 Sofia Papagiannaki
298 95b36144 Sofia Papagiannaki
        # Check not modified since
299 95b36144 Sofia Papagiannaki
        for t in t1_formats:
300 95b36144 Sofia Papagiannaki
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
301 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 304)
302 95b36144 Sofia Papagiannaki
303 95b36144 Sofia Papagiannaki
        _time.sleep(1)
304 95b36144 Sofia Papagiannaki
305 95b36144 Sofia Papagiannaki
        # update object data
306 95b36144 Sofia Papagiannaki
        appended_data = self.append_object_data(self.cname, self.oname)[1]
307 95b36144 Sofia Papagiannaki
308 95b36144 Sofia Papagiannaki
        # Check modified since
309 95b36144 Sofia Papagiannaki
        for t in t1_formats:
310 95b36144 Sofia Papagiannaki
            r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
311 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
312 95b36144 Sofia Papagiannaki
            self.assertEqual(r.content, self.odata + appended_data)
313 95b36144 Sofia Papagiannaki
314 95b36144 Sofia Papagiannaki
    def test_if_modified_since_invalid_date(self):
315 95b36144 Sofia Papagiannaki
        r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
316 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
317 95b36144 Sofia Papagiannaki
        self.assertEqual(r.content, self.odata)
318 95b36144 Sofia Papagiannaki
319 95b36144 Sofia Papagiannaki
    def test_if_not_modified_since(self):
320 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
321 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
322 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
323 95b36144 Sofia Papagiannaki
324 95b36144 Sofia Papagiannaki
        # Check unmodified
325 95b36144 Sofia Papagiannaki
        t1 = t + datetime.timedelta(seconds=1)
326 95b36144 Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
327 95b36144 Sofia Papagiannaki
        for t in t1_formats:
328 95b36144 Sofia Papagiannaki
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
329 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
330 95b36144 Sofia Papagiannaki
            self.assertEqual(r.content, self.odata)
331 95b36144 Sofia Papagiannaki
332 95b36144 Sofia Papagiannaki
        # modify object
333 95b36144 Sofia Papagiannaki
        _time.sleep(2)
334 95b36144 Sofia Papagiannaki
        self.append_object_data(self.cname, self.oname)
335 95b36144 Sofia Papagiannaki
336 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
337 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
338 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
339 95b36144 Sofia Papagiannaki
        t2 = t - datetime.timedelta(seconds=1)
340 95b36144 Sofia Papagiannaki
        t2_formats = map(t2.strftime, DATE_FORMATS)
341 95b36144 Sofia Papagiannaki
342 95b36144 Sofia Papagiannaki
        # check modified
343 95b36144 Sofia Papagiannaki
        for t in t2_formats:
344 95b36144 Sofia Papagiannaki
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
345 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
346 95b36144 Sofia Papagiannaki
347 95b36144 Sofia Papagiannaki
        # modify account: update object meta
348 95b36144 Sofia Papagiannaki
        _time.sleep(1)
349 95b36144 Sofia Papagiannaki
        self.update_object_meta(self.cname, self.oname, {'foo': 'bar'})
350 95b36144 Sofia Papagiannaki
351 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
352 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
353 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
354 95b36144 Sofia Papagiannaki
        t3 = t - datetime.timedelta(seconds=1)
355 95b36144 Sofia Papagiannaki
        t3_formats = map(t3.strftime, DATE_FORMATS)
356 95b36144 Sofia Papagiannaki
357 95b36144 Sofia Papagiannaki
        # check modified
358 95b36144 Sofia Papagiannaki
        for t in t3_formats:
359 95b36144 Sofia Papagiannaki
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
360 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
361 95b36144 Sofia Papagiannaki
362 95b36144 Sofia Papagiannaki
    def test_if_unmodified_since(self):
363 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
364 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
365 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
366 95b36144 Sofia Papagiannaki
        t = t + datetime.timedelta(seconds=1)
367 95b36144 Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
368 95b36144 Sofia Papagiannaki
369 95b36144 Sofia Papagiannaki
        for tf in t_formats:
370 95b36144 Sofia Papagiannaki
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
371 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
372 95b36144 Sofia Papagiannaki
            self.assertEqual(r.content, self.odata)
373 95b36144 Sofia Papagiannaki
374 95b36144 Sofia Papagiannaki
    def test_if_unmodified_since_precondition_failed(self):
375 95b36144 Sofia Papagiannaki
        object_info = self.get_object_info(self.cname, self.oname)
376 95b36144 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
377 95b36144 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
378 95b36144 Sofia Papagiannaki
        t = t - datetime.timedelta(seconds=1)
379 95b36144 Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
380 95b36144 Sofia Papagiannaki
381 95b36144 Sofia Papagiannaki
        for tf in t_formats:
382 95b36144 Sofia Papagiannaki
            r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
383 95b36144 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
384 95b36144 Sofia Papagiannaki
385 95b36144 Sofia Papagiannaki
    def test_hashes(self):
386 95b36144 Sofia Papagiannaki
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
387 95b36144 Sofia Papagiannaki
        oname, odata = self.upload_object(self.cname, length=l)[:-1]
388 95b36144 Sofia Papagiannaki
        size = len(odata)
389 95b36144 Sofia Papagiannaki
390 95b36144 Sofia Papagiannaki
        view_url = join_urls(self.view_path, self.user, self.cname, oname)
391 95b36144 Sofia Papagiannaki
        r = self.get('%s?format=json&hashmap' % view_url)
392 95b36144 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
393 95b36144 Sofia Papagiannaki
        body = json.loads(r.content)
394 95b36144 Sofia Papagiannaki
395 95b36144 Sofia Papagiannaki
        hashes = body['hashes']
396 95b36144 Sofia Papagiannaki
        block_size = body['block_size']
397 95b36144 Sofia Papagiannaki
        block_num = size / block_size if size / block_size == 0 else\
398 95b36144 Sofia Papagiannaki
            size / block_size + 1
399 95b36144 Sofia Papagiannaki
        self.assertTrue(len(hashes), block_num)
400 95b36144 Sofia Papagiannaki
        i = 0
401 95b36144 Sofia Papagiannaki
        for h in hashes:
402 95b36144 Sofia Papagiannaki
            start = i * block_size
403 95b36144 Sofia Papagiannaki
            end = (i + 1) * block_size
404 95b36144 Sofia Papagiannaki
            hash = merkle(odata[start:end])
405 95b36144 Sofia Papagiannaki
            self.assertEqual(h, hash)
406 95b36144 Sofia Papagiannaki
            i += 1